Flink-java(api)
1. Map
package com.wt.flink.tf
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object Demo1Map {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
//java api
val kvDS: DataStream[(String, Int)] = studentDS.map(new MapFunction[String, (String, Int)] {
/**
* 马匹方法,每一条数据执行一次,传进来一条返回一条
*
* @param value : 一行数据
* @return
*/
override def map(value: String): (String, Int) = {
val clazz: String = value.split(",")(4)
(clazz, 1)
}
})
kvDS.print()
env.execute()
}
}
2. javaApi
package com.wt.flink.tf
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
object Demo2JavaApi {
@throws[Exception]
def main(args: Array[String]): Unit = { //创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val studentDS: DataStreamSource[String] = env.readTextFile("data/students.txt")
//java 代码
val kvDS: SingleOutputStreamOperator[Tuple2[String, Integer]] = studentDS.map(new MapFunction[String, Tuple2[String, Integer]]() {
@throws[Exception]
override def map(value: String): Tuple2[String, Integer] = {
val clazz: String = value.split(",")(4)
Tuple2.of(clazz, 1)
}
})
kvDS.print
env.execute
}
}
3. FlatMapFunction
package com.wt.flink.tf
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object Demo3FlatMapFunction {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.readTextFile("data/words.txt")
//java api
val wordsDS: DataStream[String] = linesDS.flatMap(new FlatMapFunction[String, String] {
/**
* flatMap: 一条数据执行一次,传入一条数据可以返回多条数据
*
* @param value :一行数据
* @param out :用于将数据发送到下游
*/
override def flatMap(value: String, out: Collector[String]): Unit = {
val split: Array[String] = value.split(",")
for (word <- split) {
//将数据发送到下游
out.collect(word)
}
}
})
wordsDS.print()
env.execute()
}
}
4. KeyBy
package com.wt.flink.tf
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
object Demo5KeyBy {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//java api
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
override def getKey(value: (String, Int)): String = {
value._1
}
})
//key之后进行聚合计算
val sumDS: DataStream[(String, Int)] = keyByDS.sum(1)
sumDS.print()
env.execute()
}
}
5. Reduce
package com.wt.flink.tf
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
object Demo6Reduce {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
//再分组之后进行聚合计算
//val reduceDS: DataStream[(String, Int)] = keyByDS.reduce((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
//java api
val reduceDS: DataStream[(String, Int)] = keyByDS.reduce(new ReduceFunction[(String, Int)] {
override def reduce(kv1: (String, Int), kv2: (String, Int)): (String, Int) = {
(kv1._1, kv1._2 + kv2._2)
}
})
reduceDS.print()
env.execute()
}
}
6. Window
package com.wt.flink.tf
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Demo7Window {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
/**
* 统计最近10秒单词的数量,每个5秒统计一次
*
*/
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
//滑动窗口
val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
//再开窗之后进行集合计算
val countDS: DataStream[(String, Int)] = windowDS.sum(1)
countDS.print()
env.execute()
}
}
7. Union
package com.wt.flink.tf
import org.apache.flink.streaming.api.scala._
object Demo8Union {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds1: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5))
val ds2: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5))
//两个ds的类型要一致
val unionDS: DataStream[Int] = ds1.union(ds2)
unionDS.print()
env.execute()
}
}
原创文章,作者:jamestackk,如若转载,请注明出处:https://blog.ytso.com/tech/java/276518.html