这篇文章主要介绍flume+kafka+storm运行的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
概述
在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive或者mr来实现统计分析,但是对于实时的需求Hive和mr就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
直接使用Storm的Topology对数据进行实时分析处理
整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理
flume+kafka+storm相结合,此时,flume作为数据来源收集数据,kafka作为消息队列,起缓冲作用,storm从kafka拉取数据分析处理。做软件开发的都知道模块化思想,这样设计的原因有两方面:
一方面是可以模块化,功能划分更加清晰,从“数据采集–数据接入–流式计算–数据输出/存储”
1).数据采集
负责从各节点上实时采集数据,选用cloudera的flume来实现
2).数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka
3).流式计算
对采集到的数据进行实时分析,选用apache的storm
4).数据输出
对分析后的结果持久化,暂定用mysql
另一方面是模块化之后,假如当Storm挂掉了之后,数据采集和数据接入还是继续在跑着,数据不会丢失,storm起来之后可以继续进行流式计算;
数据来源flume
Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka。我们根据情况选择合适的source,这里我用的是exec,channel是memory,sink当然就是kafkasink。详细配置如下:
flume到kafka
flume到kafka的传输过程如下图:
kafka的配置跟之前搭建的没有什么改动。
测试flume到kafka
flume和kafka配置好以后,先启动flume集群,这里是后台运行:
flume-ng agent -n agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG,console &
然后启动zookeeper:
./zkServer.sh start
接着启动kafka集群,这里是后台运行:
./kafka-server-start.sh ../config/server.properties &
然后向监控的文件里输入数据:
echo 'hello world' >> topic-test.txt
接着在kafka集群上创建消费者,测试flume到kafka是否联通,当然也可以使用kafka监控工具查看:
我们可以事先创建好topic,当然我们也可以自动创建topic,设置kafka auto.create.topics.enable属性为true,默认就为true。
./kafka-console-consumer.sh –zookeeper master:2181 –from-beginning –topic topic1
这边输出'hello world'则表明flume到kafka连接成功。
storm读取kafka数据分析编程
首先搭建好storm集群,启动nimbus、supervisor、ui
然后topology编程,我这里是java编程的一个小例子:
主类
package com.kafka_storm; import java.util.HashMap; import java.util.Map; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; public class StormKafkaTopo { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("master:2181"); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 //这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout"); // 配置KafkaBolt中的kafka.broker.properties(可以参考kafka java编程) Config conf = new Config(); Map<String, String> map = new HashMap<String, String>(); // 配置Kafka broker地址 map.put("metadata.broker.list", "master:9092"); // serializer.class为消息的序列化类 map.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", map); // 配置KafkaBolt生成的topic conf.put("topic", "topic2"); //默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。 spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt"); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topo", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Topo"); cluster.shutdown(); } } }
消息处理
package com.kafka_storm; import java.io.UnsupportedEncodingException; import java.util.List; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * 使用KafkaSpout时需要子集实现Scheme接口,它主要负责从消息流中解析出需要的数据 * @author lenovo * */ public class MessageScheme implements Scheme { /* (non-Javadoc) * @see backtype.storm.spout.Scheme#deserialize(byte[]) */ public List<Object> deserialize(byte[] ser) { try { String msg = new String(ser, "UTF-8"); return new Values(msg); } catch (UnsupportedEncodingException e) { } return null; } /* (non-Javadoc) * @see backtype.storm.spout.Scheme#getOutputFields() */ public Fields getOutputFields() { // TODO Auto-generated method stub return new Fields("msg"); } }
bolt
package com.kafka_storm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SenqueceBolt extends BaseBasicBolt{ /* (non-Javadoc) * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = (String) input.getValue(0); String out = "I'm " + word + "!"; System.out.println("out=" + out); collector.emit(new Values(out)); } /* (non-Javadoc) * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer) */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } }
在集群上运行
我们要将引入的第三方包全部放到storm的lib包下面,包括kafka、zookeeper的,否则会报缺失jar包的错
storm jar StormKafkaDemo.jar com.kafka_storm.StormKafkaTopo StormKafkaDemo
开始总体测试:
向flume监控的文件输入数据,在storm的log日志里查看输出,当然我们也可以在kafka里查看,因为我将结果输出到kafka里了,topic为topic2。
日志里结果如下:
以上是“flume+kafka+storm运行的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/230393.html