Flink-java(api)


Flink-java(api)

1. Map

package com.wt.flink.tf
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

object Demo1Map {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
    
    //java api
    val kvDS: DataStream[(String, Int)] = studentDS.map(new MapFunction[String, (String, Int)] {
      /**
       * 马匹方法,每一条数据执行一次,传进来一条返回一条
       *
       * @param value : 一行数据
       * @return
       */
      override def map(value: String): (String, Int) = {
        val clazz: String = value.split(",")(4)
        (clazz, 1)
      }
    })

    kvDS.print()

    env.execute()
  }
}

2. javaApi

package com.wt.flink.tf
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

object Demo2JavaApi {
  @throws[Exception]
  def main(args: Array[String]): Unit = { //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val studentDS: DataStreamSource[String] = env.readTextFile("data/students.txt")
    //java 代码
    val kvDS: SingleOutputStreamOperator[Tuple2[String, Integer]] = studentDS.map(new MapFunction[String, Tuple2[String, Integer]]() {
      @throws[Exception]
      override def map(value: String): Tuple2[String, Integer] = {
        val clazz: String = value.split(",")(4)
        Tuple2.of(clazz, 1)
      }
    })
    kvDS.print
    env.execute
  }
}

3. FlatMapFunction

package com.wt.flink.tf
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object Demo3FlatMapFunction  {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.readTextFile("data/words.txt")

    //java api
    val wordsDS: DataStream[String] = linesDS.flatMap(new FlatMapFunction[String, String] {
      /**
       * flatMap: 一条数据执行一次,传入一条数据可以返回多条数据
       *
       * @param value :一行数据
       * @param out   :用于将数据发送到下游
       */
      override def flatMap(value: String, out: Collector[String]): Unit = {
        val split: Array[String] = value.split(",")
        for (word <- split) {
          //将数据发送到下游
          out.collect(word)
        }
      }
    })
    wordsDS.print()

    env.execute()
  }
}

4. KeyBy

package com.wt.flink.tf
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._

object Demo5KeyBy {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    //java api
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    })

    //key之后进行聚合计算
    val sumDS: DataStream[(String, Int)] = keyByDS.sum(1)

    sumDS.print()

    env.execute()
  }
}

5. Reduce

package com.wt.flink.tf
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._

object Demo6Reduce {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    //再分组之后进行聚合计算
    //val reduceDS: DataStream[(String, Int)] = keyByDS.reduce((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))

    //java api
    val reduceDS: DataStream[(String, Int)] = keyByDS.reduce(new ReduceFunction[(String, Int)] {
      override def reduce(kv1: (String, Int), kv2: (String, Int)): (String, Int) = {
        (kv1._1, kv1._2 + kv2._2)
      }
    })

    reduceDS.print()
    env.execute()
  }
}

6. Window

package com.wt.flink.tf
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Demo7Window {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    /**
     * 统计最近10秒单词的数量,每个5秒统计一次
     *
     */
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    //滑动窗口
    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

    //再开窗之后进行集合计算
    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

7. Union

package com.wt.flink.tf
import org.apache.flink.streaming.api.scala._
object Demo8Union {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val ds1: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5))
    val ds2: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5))
    
    //两个ds的类型要一致
    val unionDS: DataStream[Int] = ds1.union(ds2)

    unionDS.print()

    env.execute()
  }
}

原创文章,作者:jamestackk,如若转载,请注明出处:https://blog.ytso.com/276518.html

(0)
上一篇 2022年7月23日
下一篇 2022年7月23日

相关推荐

发表回复

登录后才能评论