strom简介
官方网址:http://storm.apache.org/
是一个免费,开源的分布式实时计算系统,使用它可以轻松实现数据流的实时处理,Strom很简单,可以用任何编程语言
storm用例:实时在线分析 机器学习,连续计算,分布式RPC,ETL等。
Strom的特点:快速:基准时钟在超过一百万元组(可以理解为数据包)每秒处理的每个节点
简单的设置:有可扩展性,容错性,保证了数据的处理能力,并且易于设置和操作
storm实时流式计算系统
storm集群与hadoop集群(MapReduce)对比
MapReduce是批处理流程 //hadoop处理海量历史任务,不能做到实时
storm没有缓冲区原数据源源不断的进入处理系统,这是流处理 //实时流计算,一直运行直到停止。
Topology(拓扑)与 Mapreduce
一个关键的区别是: 一个MapReduce job 最终会结束, 而一个topology 永远会运行(除非你手动kill 掉)
Nimbus(作业控制和资源管理 master进程) 与ResourManager
在Storm 的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus 后台程序,它的作用类似Hadoop 里面的JobTracker //JobTracker是整个集群中唯一的全局管理者,涉及的功能包括作业控制和资源管理。
Nimbus 负责在集群里面分发代码,分配计算任务给机器, 并且监控状态。
Supervisor (worker 进程)与NodeManager(YarnChild)
每一个工作节点上面运行一个叫做Supervisor 的节点。Supervisor 会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology 的一个子集;一个运行的topology 由运行在很多机器上的很多工作进程组成。
storm实时流式计算的工作机制
2个角色
nimbus是集群的主节点:负责在集群里面分发代码,分配计算任务给机器, 并且监控状态。//作业控制和资源管理
supervisor是集群的从节点:每一个工作节点上面运行一个叫做Supervisor 的节点,每一个supervisor里面会有worker进程在服务器上运行着,这些worker是真正干活的。
nimbus和supervisor直接并没有直接的联系,而是需要第三方工具zookeeper实现的
第一个supervisor里面的worker会调用我们写的一个类比如叫(采集水这个类),处理好了之后,会再次在这个worker里面封装成一定的数据包的格式发出去,发给下一个worker,下一个worker会去处理上一个worker传给他的结果,去调用我们写的另一个逻辑(调用我们写的类 过滤逻辑)然后,就是在第二个worker里面去处理,然后再封装成一定的数据包的格式发出去,发给下一个worker。
下一个worker也是不知道自己怎么办,而是调用我们程序自己写的逻辑(比如调用沉淀这个类),处理完的数据再次的封装成一个数据包传给下一个worker。
最后一个处理步骤,会把处理的结果我们源源不断的放在一个内存数据库中,(处理结果的使用者)谁要用就可以直接的去使用数据。
小结:
整个处理流程的组织协调不用用户去关系,用户只需要去定义每一个步骤中的具体的业务处理逻辑
具体执行任务的角色是worker,worker执行任务时具体的行为则由我们定义的业务逻辑决定。
storm处理数据流程小结
1、客户端client把任务(topology)提交给nimbus
2、nimbus会把任务分配的一些信息放在zookeeper上面;
3、supervisor会通过zookeeper领取到任务
4、supervisor再分配给worker去运行我们的任务
官方解释如下://这里的基本概念不懂的见下文
也可以叫做Topology运行机制
(1)Storm 提交后,会把代码首先存放到Nimbus 节点的inbox 目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser 文件放到Nimbus 节点的stormdist 目录中,在此目录中同时还有序列化之后的Topology 代码文件;
(2)在设定Topology 所关联的Spouts 和Bolts 时,可以同时设置当前Spout 和Bolt 的executor数目和task 数目,默认情况下,一个Topology 的task 的总和是和executor 的总和一致的。之后,系统根据worker 的数目,尽量平均的分配这些task 的执行。worker 在哪个supervisor节点上运行是由storm 本身决定的;
(3)任务分配好之后,Nimbes 节点会将任务的信息提交到zookeeper 集群,同时在zookeeper集群中会有workerbeats 节点,这里存储了当前Topology 的所有worker 进程的心跳信息;
(4)Supervisor 节点会不断的轮询zookeeper 集群,在zookeeper 的assignments 节点中保存了所有Topology 的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor 通过轮询此节点的内容,来领取自己的任务,启动worker 进程运行;
(5)一个Topology 运行之后,就会不断的通过Spouts 来发送Stream 流,通过Bolts 来不断的处理接收到的Stream 流,Stream 流是×××的。
最后一步会不间断的执行,除非手动结束Topology。
有几点需要说明的地方:
(1)每个组件(Spout 或者Bolt)的构造方法和declareOutputFields 方法都只被调用一次。
(2)open 方法、prepare 方法的调用是多次的。入口函数中设定的setSpout 或者setBolt 里的并行度参数指的是executor 的数目,是负责运行组件中的task 的线程的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor 运行的时候调用一次。相当于一个线程的构造方法。
(3)nextTuple 方法、execute 方法是一直被运行的,nextTuple 方法不断的发射Tuple,Bolt的execute 不断的接收Tuple 进行处理。只有这样不断地运行,才会产生×××的Tuple 流,体现实时性。相当于线程的run 方法。
(4)在提交了一个topology 之后,Storm 就会创建spout/bolt 实例并进行序列化。之后,将序列化的component 发送给所有的任务所在的机器(即Supervisor 节点),在每一个任务上反序列化component。
(5)Spout 和Bolt 之间、Bolt 和Bolt 之间的通信,是通过zeroMQ 的消息队列实现的。
(6)上图没有列出ack 方法和fail 方法,在一个Tuple 被成功处理之后,需要调用ack 方法来标记成功,否则调用fail 方法标记失败,重新处理这个Tuple。
终止Topology
通过在Nimbus 节点利用如下命令来终止一个Topology 的运行:
bin/storm kill topologyName
kill 之后,可以通过UI 界面查看topology 状态,会首先变成KILLED 状态,在清理完本地目录和zookeeper 集群中的和当前Topology 相关的信息之后,此Topology 就会彻底消失。
小结zookeeper在storm中的作用
1、nimbus会把任务分配的一些信息放在zookeeper上面;
2、supervisor会通过zookeeper领取到任务
3、numbus需要通过zookeeper去感知supervisor的健康状态
Topology的概念类似于MapReduce中提交的一个任务 job
每台supervisor上会有多个worker进程
每个worker进程中运行着若干个executor线程
每个executor中运行着若干个相同的task
部署storm集群
strom里面处了Nimbus Supervisor 还需要依赖zookeeper,所以在安装Strom的时候确保zookeeper安装了
storm的配置与部署
下载storm,然后上传到linux中
我们解压后到conf目录下修改配置
cd conf/
vi storm.yaml
告诉zookeeper在那几台机器上部署了
storm.zookeeper.servers:
- "hadoop-server-00"
- "hadoop-server-01"
- "hadoop-server-02"
告诉Strom nimbus在那台主机上
nimbus.host: "hadoop-server-00"
保存退出
supervisor是不需要去指定的,他的数量是可以动态的去增减
然后把他分发到每台机器上去
scp -r apache-storm-0.9.2-incubating/ hadoop-server-01:/usr/local/apps/
scp -r apache-storm-0.9.2-incubating/ hadoop-server-02:/usr/local/apps/
启动storm要先启动zookeeper
进入zookeeper的bin目录下去启动zookeeper
./zkCli.sh start
./zkCli.sh status(查看他的状态)
启动Strom
bin目录上
./storm nimbus(那台机器上配置了nimbus就在那台机器上启动nimbus)
在另外的两台机器上去启动Supervisor
01机器上的bin目录上
./storm Supervisor
02机器上的bin目录上
./storm Supervisor
可以通过jps来看进程数
storm也是可以通过网页来看的,但是必须要启动打开网页的外部服务的进程命令,也必须在启动nimbus的这台机器上去启动这个进程
启动外部服务的进程命令是cd app/(strom安装包)/bin/storm ui //直接执行这个命令
jps查看进程 ./strom ui
ui的进程叫core
这样我们就可以通过网页来看Strom的状态
HTTP://hadoop-server-00:8080
小结: //这里是后台启动
在nimbus 主机上
//启动协调管理nimbus
./storm nimbus 1>/dev/null 2>&1 &
//启动web 管理界面启动后可以通过nimbus 主机名:8080 端口进行仿问
./storm ui 1>/dev/null 2>&1 &
在supervisor 主机上
./storm supervisor 1>/dev/null 2>&1 &
slots 代表:槽位,也就是work进程,supervisor内启动的进程,默认启动4个。当你的机器的内核非常好的时候,可以修改配置来增加槽位数
可以知道那个worker的数量 如果不指点默认为4个
(在配置之前需要把进程都停掉,按ctrl+c就可以停掉进程了)
在配置项vi storm.yaml 里面最后增加(要顶格写)
supervisor.slots.ports:
-6701
-6702
-6703
-6704
-6705
-6706
//这些数字表示worker显示的端口
保存退出
完后,我们要把这个配置文件分发到另外两台机器上去
scp storm.yaml hadoop-server-01:/usr/local/apps/strom(安装包)/conf/
scp storm.yaml hadoop-server-02:/usr/local/apps/strom(安装包)/conf/
这样每个最大的worker数量为6
启动Strom为后台进程
在00机器上
bin/storm nimbus 1>/dev/null 2>&1 & (就是启动nimbus 1到dev下的null目录中【标准输出从定性到这个文件中】 把2也从定性到1所去的地方,最后&表示为启动一个后台进程)
在00机器上
bin/storm supervisor 1>/dev/null 2>&1 &
注意:如果有错误退出,我们可以看看日志文件
cd logs/
ll
less supervisor.log
在00机器上
bin/storm ui 1>/dev/null 2>&1 &
(为了在网页中可以观察,我们必须在启动nimbus的这台机器上去启动ui)
我们切换到zookeeper下去打开zookeeper的客户端
cd /apps/zookeeper(安装包)/bin
./zkCli.sh
就会发现一个storm的节点
ls /strom
就会看到Strom下的节点
ls /strom/supervisor
就会看到supervisor下的节点,每个supervisor就会有一个相应的id和网页上的id是一一对应的
配置小结:
Storm 相关配置项
在storm.yaml 中常用的几个选项
storm.zookeeper.root
Storm 在zookeeper 集群中的根目录,默认是“/”
topology.workers
每个Topology 运行时的worker 的默认数目,若在代码中设置,则此选项值被覆盖
storm.zookeeper.servers
zookeeper 集群的节点列表
storm.local.dir
Storm 用于存储jar 包和临时文件的本地存储目录
ui.port
Storm 集群的UI 地址端口号,默认是8080
nimbus.host:
Nimbus 节点的host
supervisor.slots.ports
Supervisor 节点的worker 占位槽,集群中的所有Topology 公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当所有的槽位数都分配完时,新提交的Topology 只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的Topology 分配。
storm的编程基本概念
topology:拓扑也叫一个任务,只不过一旦启动起来就永不停歇,和mapreduce里的job类似只不过job处理完一个任务后就自动停止了
topology内部还分为spouts和bolts
spouts:拓扑的消息源,类似于mapreduce中的map,为后续的处理流程读取数据源(拿数据)
bolts:拓扑的处理逻辑单元(在spouts之后的组件叫bolts),bolts可以有很多级,分别处理不同的功能,类似于mapreduce的reduce只不过bolts组件可以有任意多级(处理数据)
tuple:消息元组//是作为spouts往bolts之间传递数据,封装数据之后叫做tuple,tuple框架来实现spouts往bolts之间的数据传递
tuple里面可以传递多个filed,每个filed可以定义一个名称。
//spouts、bolts组件之间传递数据必须封装在tuple中,tuple可以哦实现定义schema,规定有哪些字段。
组件与组件之间数据传递的路线//叫做streaming
stream:流 //数据的流向
stream grouping:流里面的分组策略也可以叫做数据流向的策略,可以理解为MapReduce中的shuffle阶段,指的是在stream中两头的运行实例之间数据的分发规则,
类比mapreduce中的maptask–>reduce task之间的partition(划分)策略(有很多策略)
tasks:任务处理单元
executor:工作进程(是在workers的线程)
workers:工作进程(是一个多线程的程序)
tasks在executor里面 executor在workers里面
configuration:topology的配置
编程的时候要导入storm的jar包 我们在用集群区工作的时候,每一个集群机器都应该创建一个storm分析之后的目录
3台机器,就应该在3台机器上去创建
我们编好java程序后打成jar包,长传到linux机器上面去,
其实storm和mapreduce程序的编写差不多
执行storm的命令为
在bin目录下
./storm jar ~/phonetopo.jar 客户端主类 参数
~/phonetopo.jar:表示,用户主目录下的phonetopo.jar
参数为:集群提交的时候,给他的名称
启动后我们可以通过命令来查看
bin/strom list
程序会一直运行下去,实时在线分析
我们用命令去关闭程序
bin/storm kill phone-topo(phone-topo:客户端给的名字)
原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/194663.html