Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。
今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。
上图讲解
运行环境:需要linux环境下的spark环境
本例用的centOS 6.5×64 因为需要使用TCP协议传输数据,所以需要安装一个nc插件。
安装方式: yum install ncxxx 或者挂载光盘安装
安装后启动nc -lk 9999 端口可以随便指定,最好是1024以上的就可以。
下面贴出代码
java版本的
import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; public class SparkDemo { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]"); JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5)); //使用带状态的算子,需要checkpoint做容错处理 jsc.checkpoint("D://chkspark"); JavaReceiverInputDStream<String> socketTextStream=jsc.socketTextStream("10.115.27.234", 1000); JavaDStream<String> wordsDstream=socketTextStream.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID=1L; public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> wordsToPairDstream=wordsDstream.mapToPair(new PairFunction<String, String,Integer>() { private static final long SerialVersionUID=1L; public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); /** * 一个batch对应一个RDD。 * */ JavaPairDStream<String, Integer> resultDstream=wordsToPairDstream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { private static final long serialVersionUID=1L; public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { Integer oldValue=0; //默认旧value是0 if (state.isPresent()) { oldValue=state.get(); } for (Integer value:values) { oldValue+=value; } return Optional.of(oldValue); } }); //打印结果 resultDstream.print(); jsc.start(); jsc.awaitTermination(); } }
程序测试: 从linux端的nc 下输入任意字符串,spark streaming会实时对输入的数据做出统计。类似于wordcount. 除非手动kill这个进程,否则会一直运行下去。因为它的原理就是和自来水的水流一样,是一连串的数据流。
运行结果展示:
也可以用scala写出同样的程序,代码量更少。
需要深入理解spark streaming的架构原理。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/195669.html