五、spark–spark streaming原理和使用

一、spark-streaming概述

1.1 常用的实时计算引擎

实时计算引擎也叫流式计算引擎,常用的目前有3个:
1、Apache Storm:真正的流式计算
2、Spark Streaming:严格上来说,不是真正的流式计算(实时计算)
​ 把连续的流式数据,当成不连续的RDD来处理
​ 本质:是一个离散计算(不连续的数据)
​ 面试中问到时:先说它的本质,
​ 然后说自己的理解
​ 常用的方法
​ 和其他同类型技术的对比
3、Apache Flink:真正的流式计算。和Spark Streaming相反。
​ 本质:一个流式计算,虽然可以用于离线计算,但是本质上是将离散数据模拟成流式数据来给flink做流式计算

1.2 spark-streaming是什么

​ Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。

特点:
1、易用:集成在Spark中
2、容错性:底层RDD,RDD本身就具备容错机制。
3、支持多种编程语言:Java Scala Python

1.3 spark-streaming架构

spark-streaming用来接收实时数据,然后处理程序通过类似于定时采样的方式分批取得数据,每一批数据就是一个RDD,最终输入给处理程序的是一个RDD队列流,这个流其实就是discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。DStream对象就是可以用来调用各种算子进行处理
五、spark--spark streaming原理和使用

​ 图1.1 DStream原理

1.4 案例演示–NetworkWordCount

首先启动netcat服务器,并监听在端口1234上

nc -l 1234

没有这个命令就 yum -y install netcat 安装一下

接着启动spark-streaming样例程序,从本地的1234端口获取数据,并进行wordcount操作

到spark的安装目录下,执行bin目录下的命令:
bin/run-example streaming.NetworkWordCount localhost 1234

然后在netcat端输入各种字符串:

[root@bigdata121 hive-1.2.1-bin]# nc -l 1234
king king hello

在另外一个窗口查看统计信息:

-------------------------------------------
Time: 1567005584000 ms
-------------------------------------------
(hello,1)
(king,2)

这边就立马统计出来了

1.5 自行编写NetworkWordCount

首先maven中pom.xml记得再加上streaming的依赖(为了方便最好spark所有组件的依赖都加上)
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>king</groupId>
    <artifactId>sparkTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.8.0</version>
        </dependency>

    </dependencies>

    <!--下面这是maven打包scala的插件,一定要,否则直接忽略scala代码-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * wordcount流式计算程序
  *
  * 1、创建streamingContext对象
  *    创建DStream流(离散流)
  *    本质是离散计算
  *
  *    离散:将连续数据变成离散数据,并实时立刻处理
  *    离线:并非是实时处理的
  *
  * 2、DStream表现形式就是RDD
  *    和操作RDD一样
  *
  * 3、使用DStream将连续的数据库切割成离散的RDD
  */
object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    //Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    /**
    这是 StreamingContext 对象的标准创建方式
    无法通过 sparkSession对象来创建
    */
    //创建streamingContext对象,指定master为local[2],意思是使用至少两个核心,即两个线程,一个用于发送数据,一个处理数据
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //这里指定conf对象,还有批处理的时间间隔为3秒,每3秒切一个rdd,然后处理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //创建接收数据源,这里创建socketstream,接收数据,内部会自动切割成一个个rdd。
    //指定监听的主机端口
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //wordcount流程
    val rdd1 = streamText.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //打印结构
    rdd1.print()

    //启动streamingContext,开始计算
    streamingContext.start()

    //等待任务结束
    streamingContext.awaitTermination()
  }

}

在bigdata121虚拟机上启动netcat服务:

nc -l 1234

idea中运行上面的程序,并在netcat中输入字符,结构和实例的一样

二、streaming基本原理和使用

2.1 StreamingContext对象的概念

1、StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。

2、一旦一个StreamingContext开始运作,就不能设置或添加新的流计算。

3、一旦一个上下文被停止,它将无法重新启动。

4、同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。

5、StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。

6、只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。

2.2 离散流(DStreams):Discretized Streams

​ DStream对象可以说整个spark-streaming程序的一个数据的出口,处理的数据都从这里来。前面也说了,这个对象里面其实一个个的RDD,这是DStream的本质。而且经过算子的转换之后,DStream仍旧是DStream对象,里面也还是RDD。所以算子转换的过程和普通RDD的概率类似。总的来说streaming程序中,就是DStream之间的转换,本质上就是DStream中的RDD的转换

2.3 DStream的算子

算子列表:
五、spark--spark streaming原理和使用

​ 图2.1 DStream算子

和普通rdd很类似,有两个比较特殊的算子,transform和updateStateByKey

2.3.1 transform

transform(RDD[T]=>RDD[U])
是一个用于将dstream中的rdd转换成新的rdd的算子。所以要注意,这个算子中的处理函数是接收rdd作为参数,不像其他算子是接收rdd中的数据作为参数的。

例子:
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //这里指定conf对象,还有批处理的时间间隔为4秒,每4秒切一个rdd,然后处理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //创建socketstream,接收数据,内部会自动切割成一个个rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //接收的函数参数中就是rdd,然后在里面对rdd进行处理,最后返回新的rdd
    streamText.transform(rdd=>{
      rdd.flatMap(_.split(" "))
    })

2.3.2 updateStateByKey

​ 默认情况下,Spark Streaming 不记录之前的状态,每次发一条数据,都从0开始。比如说进行单词统计时,之前统计的单词数量并不会累加到下一次的统计中,下一次是从0开始计数的。如果想进行累加操作,使用这个算子来实现这个功能

updateStateByKey((Seq[T],Option[S])=>Option[S])
这个算子接收的函数的参数要求有两个:
Seq[T]:当前对key进行分组后,同一个key的value的一个集合,比如("age",[1,2,1,1])中的[1,2,1,1]
Option[S]:同一个key,在此之前的value总和,也就是这个key之前的计数状态
返回值是之前的计数+现在的计数的一个返回值

例子:
下面将之前的wordcount改变一些,实现单词的持续计数,不会每次都重新从0开始计数

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 测试updateStateByKey 进行状态的累加
  */
object TestUpdateState {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //创建streamingContext对象,指定master为local[2],意思是使用至少两个核心,即两个线程,一个用于发送数据,一个处理数据
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //这里指定conf对象,还有批处理的时间间隔为4秒,每4秒切一个rdd,然后处理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //设置检查点,保存之前状态,需要保证目录不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming")

    //创建socketstream,接收数据,内部会自动切割成一个个rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //切割数据,并添加计数对
    val wordPair = streamText.flatMap(_.split(" ")).map((_,1))

    //累加处理函数
    val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
      //当前值累加
      val currentSum = currentValues.sum

      //取出之前的值.如果值不存在就返回0
      val pre = previousValue.getOrElse(0)

      //之前和现在的值相加
      Option(pre + currentSum)
    }

    //更新,将旧计数更新为新计数状态
    wordPair.updateStateByKey(addFunc).print()

    //启动streamingContext,开始计算
    streamingContext.start()

    //等待任务结束
    streamingContext.awaitTermination()
  }
}

运行这个demo的过程出现的报错:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets

说是没有org.apache.commons.io.Charsets 这个类,进去org.apache.commons.io看了下,果然没有,估计是包版本太旧了,没有这个类,百度了一下,2.5版本的有这个类,所以就在pom.xml添加上新的依赖

<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.5</version>
</dependency>

接着运行,OK了

2.3.3 foreachRDD

这个算子类似forech,但是操作的对象是整个rdd,不是rdd中的某些数据。

foreachRDD(RDD[T]=>Unit)
一般用于将rdd的结果写入其他存储中,比如hdfs,mysql等

下面有一个关于 foreachRDD和sql 的例子。

2.4 窗口操作

应用场景:
一般用于统计最近N小时的数据,这样的应用的场景,这时候就需要窗口

2.4.1 原理

原理图:
五、spark--spark streaming原理和使用

​ 图2.2 spark-streaming窗口操作

​ 窗口其实就是DStream的基础上,再加上一个时间范围。如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。所以窗口操作比起普通的DStream操作,普通的DStream是一个个RDD处理,而窗口则是一个时间范围内的RDD一起处理。而且窗口是DStream再上一层的一个封装。
​ 使用窗口的时候,有两个关键参数:
窗口长度(windowlength):窗口的时间长度(上图的示例中为:3)
滑动间隔(slidinginterval): 两次相邻的窗口操作的间隔(即每次滑动的时间长度)(上图示例中为:2)
而且要注意的一点是:这两个参数必须是源DStream的采样间隔的倍数(上图示例中为:1)。因为如果不是整数倍,就会导致窗口边缘会将一个rdd分隔成两份,这样是不行的,spark没办法处理半个rdd,rdd是不可分的。

2.4.2 窗口操作的相关算子

window(windowLength, slideInterval)
->基于源DStream产生的窗口化的批数据计算一个新的DStream

countByWindow(windowLength, slideInterval)
->返回流中元素的一个滑动窗口数

reduceByWindow(func, windowLength, slideInterval)
->返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
->应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
->上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。 请注意,使用此操作必须启用检查点。

countByValueAndWindow(windowLength, slideInterval, [numTasks])
->应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

比较常用的是reduceByKeyAndWindow这个,常用于统计固定最近一段时间内的数据,比如统计最近1小时订单销售量。下面把这个算子应用到wordcount例子中。

2.4.3 例子

窗口大小为30s,每10s滑动一次窗口,并且对单词的计数是累加的

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 测试updateStateByKey 进行状态的累加
  */
object TestUpdateState {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //创建streamingContext对象,指定master为local[2],意思是使用至少两个核心,即两个线程,一个用于发送数据,一个处理数据
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //这里指定conf对象,还有批处理的时间间隔为4秒,每4秒切一个rdd,然后处理.
    val streamingContext = new StreamingContext(conf, Seconds(1))

    //设置检查点,保存之前状态,需要保证目录不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")

    //创建socketstream,接收数据,内部会自动切割成一个个rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //切割数据,并添加计数对
    val wordPair = streamText.flatMap(_.split(" ")).map((_,1))

    //在这里添加一个窗口操作
    val windowValue = wordPair.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(30), Seconds(10))

    //累加处理函数
    val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
      //当前值累加
      val currentSum = currentValues.sum

      //取出之前的值.如果值不存在就返回0
      val pre = previousValue.getOrElse(0)

      //之前和现在的值相加
      Option(pre + currentSum)
    }

    //更新,将旧计数更新为新计数状态
    //wordPair.updateStateByKey(addFunc).print()
    windowValue.updateStateByKey(addFunc).print()

    //启动streamingContext,开始计算
    streamingContext.start()

    //等待任务结束
    streamingContext.awaitTermination()
  }
}

2.5 sql操作

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 将streaming的DStream转为可以使用sql操作
  */
object StreamingAndSql {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("streaming and sql").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(2))

    val lines = ssc.socketTextStream("bigdata121",1234, StorageLevel.MEMORY_ONLY)

    val words = lines.flatMap(_.split(" "))

    //需要将rdd转为df对象,才能用于spark sql操作
    words.foreachRDD(rdd => {
      //从rdd中获取conf配置,保证配置和rdd的配置一样
      val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
      import spark.sqlContext.implicits._
      //rdd转为df,并指定列名
      val df = rdd.toDF("word")
      //创建视图并执行sql
      df.createOrReplaceTempView("tmp1")
      val resultDF = spark.sql("select word,count(1) from tmp1 group by word")
      resultDF.show()
    })

    ssc.start()
    ssc.awaitTermination()

  }

}

2.6 checkpoint检查点

这个和rdd中类似,只不过streaming中是通过 StreamingContext对象进行checkpoint:

//创建streamingContext对象,指定master为local[2],意思是使用至少两个核心,即两个线程,一个用于发送数据,一个处理数据
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //这里指定conf对象,还有批处理的时间间隔为4秒,每4秒切一个rdd,然后处理.
    val streamingContext = new StreamingContext(conf, Seconds(1))

    //设置检查点,保存之前状态,需要保证目录不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")

三、streaming的数据源

3.1 基本数据源

文件流:textFileStream
套接字流:socketTextStream/sockeStream,前面已经讲过例子,这里不重复
RDD队列流:queueStream

1、textFileStream
通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流
需要注意的是:
这些文件具有相同的格式
这些文件通过原子移动或重命名文件的方式在dataDirectory创建
如果在文件中追加内容,这些追加的新数据也不会被读取。

例子:
package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFromFile {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark window operation").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    val fileStream = ssc.textFileStream("G://test//teststreaming")

    fileStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

//==========================================================
2、queueStream
RDD队列流是从一个队列中读取RDD
例子:
package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object StreamingFromRDDQueue {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark streaming rdd queue").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    //创建队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //队列中添加rdd
    for (x<- 1 to 3) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    }

    //从队列读取rdd
    val queueRdd = ssc.queueStream(rddQueue).map(_*2)
    queueRdd.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.2 高级数据源

高级数据源一般在生产中比较常用,很少使用spark直接监控数据的。常用的高级数据源有Kafka,Flume,Kinesis,Twitter等等。下面主要讲解flume

3.2.1flume

1、flume推送数据到计算节点
(1)首先配置flume的agent配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

# 监控目录
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

# 我这里是在ide中直接运行spark程序,所以flume数据直接推导windows主机上
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.50.1
a1.sinks.k1.port=1234

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

(2)spark代码
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkDemo</groupId>
    <artifactId>SparkDemoTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->

        <!--这里是spark从flume读取数据的依赖,不要忘了-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-sink -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume-sink_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>

    </dependencies>

    <!--下面这是maven打包scala的插件,一定要,否则直接忽略scala代码-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

依赖这里,方便起见,直接添加flume和spark的全部依赖,自己到maven的官方库上搜索,然后添加就可以。接着最重要的是 spark使用flume的依赖的spark-streaming-flume 这个包,不要漏了。如果在集群中运行,记得将这个包放到spark的jars目录下

代码:

package SparkStreamExer

import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFromFlume {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark streaming from flume").setMaster("local[2]")
    conf.registerKryoClasses(Array())

    val ssc = new StreamingContext(conf, Seconds(4))

    //创建flumeevent,接收从flume push来的数据
    val flumeDStream = FlumeUtils.createStream(ssc, "192.168.50.1", 1234, StorageLevel.MEMORY_ONLY)

    val eventDStream = flumeDStream.map(event => {
      (event.event.getHeaders.toString,new String(event.event.getBody.array()))
    })

    eventDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

(3)启动:

先启动spark程序,直接在ide中运行。
接着启动flume:flume-ng agent --conf conf --name a1 --conf-file conf/flume-spark.properties  -Dflume.root.logger=INFO,console

然后自己在监控目录下修改文件,或者添加文件。
接着查看ide中输出的数据

2、spark从flume拉取数据
这种方式比起第一种方式要更加灵活,可扩展性高。
(1)flume配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

# 这里使用spark自己实现的一个sink
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.50.121
a1.sinks.k1.port=1234

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

另外,需要将spark-streaming-flume-sink_2.11-2.1.0.jar 这个jar包添加到flume的lib目录下,这是上面使用的SparkSink所在的jar包。可以自己在idea中添加这个依赖,然后下载,接着到本地仓库目录复制到flume的lib下。
(2)代码
pom.xml

和上面类似,只是多了
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.1.0</version>
</dependency>
这个依赖        

代码:

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FromFlumePull {
  def main(args: Array[String]): Unit = {
    //设置日志级别为ERROR,默认是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("flume through spark sink").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    //创建 poll streaming,从flume拉取数据到本地处理
    val flumePollingStream = FlumeUtils.createPollingStream(ssc, "bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    /**
      * 这里要注意:
      * event.event.getBody.array() 不要直接 toString,解析处理的字符串只是[class name]@[hashCode]的形式
      * 应该用 New string(event.event.getBody.array()) 这样会根据默认编解码规则给bytes字符串解码
      * 因为传输过来的是bytes数据
      */
    flumePollingStream.map(event=>{
      (event.event.getHeaders.toString, new String(event.event.getBody.array()))
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

(3)启动
启动方式和上面类似,这里不重复。

(4)遇到的问题
问题1:
已经将spark-streaming-flume-sink_2.11.jar包放到flume的lib目录下,flume的agent启动时报错:

29 Aug 2019 17:59:31,838 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

 重点在:java.lang.IllegalStateException: begin() called when transaction is OPEN!
有可能是flume的一些jar包的问题,具体还不清楚。

屡次报这个错,最后看了看flume的lib下的scala包

scala-library-2.10.5.jar

是这个版本,我放进去的sparksink包是基于 scala 2.11.8的,所以我在想是不是scala library包版本不对,所以从spark的jar目录下拷贝scala-library-2.11.8.jar 这个包过去flume下,将原来的重命名,不让flume使用旧的。
接着重新启动flume agent,正常运行。

所以这个问题是因为依赖包版本不对应的问题发生的。

问题2:
读取body的时候,直接toString和new String有区别,前者乱码,后者还原原本字符串

toString()与new String ()用法区别

str.toString是调用了b这个object对象的类的toString方法。一般是返回这么一个String:[class name]@[hashCode]。
new String(str)是根据parameter是一个字节数组,使用Java虚拟机默认的编码格式,将这个字节数组decode为对应的字符。若虚拟机默认的编码格式是ISO-8859-1,按照ascii编码表即可得到字节对应的字符。

什么时候用什么方法呢?
new String()一般使用字符转码的时候,byte[]数组的时候
toString()将对象打印的时候使用 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/196595.html

(0)
上一篇 2021年11月16日
下一篇 2021年11月16日

相关推荐

发表回复

登录后才能评论