一.flume介绍
flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统。Flume支持日志系统中定制各类数据发送方,用于收集数据。同时flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力。
二.功能介绍
日志收集
Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
流程:恒生数据接收中间件––file.txt 哪个端口进行监控 — 数据监控—接收数据—-内存—存储本地硬盘
Flume—对哪个ip 哪个端口进行监控 — 数据监控—接收数据—-内存—存储本地硬盘
数据处理
Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 Flume提供了从Console(控制台)、RPC(Thrift-RPC)、Text(文件)、Tail(UNIX tail)、Syslog(Syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
三.flume原理图结构
Flume逻辑上分三层架构:Agent,Collector,Storage。
Flume OG采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
FLUM OG 的特点是:
FLUM OG 有三种角色的节点:代理节点(agent)、收集节点(collector)、主节点(master)。
agent 从各个数据源收集日志数据,将收集到的数据集中到 Collector,然后由收集节点汇总存入 HDFS。master 负责管理 agent,collector 的活动。
agent、collector 都称为 node,node 的角色根据配置的不同分为 logical node(逻辑节点)、physical node(物理节点)。
agent、collector 由 source、sink 组成,代表在当前节点数据是从 source 传送到 sink。
Flume-NG架构介绍
Flume NG最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume NG另一s个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为Runner)。在 Flume NG 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。
FLUME NG 的特点是:
NG 只有一种角色的节点:代理节点(agent)。
没有 collector、master 节点,这是核心组件最核心的变化。
去除了 physical nodes、logical nodes 的概念和相关内容。
agent 节点的组成也发生了变化。Flume NG的 agent 由 source、sink、Channel 组成。
四.flume三大组件介绍(agent,channel,sink)
Flume以Agent为最小的独立运行单位。Agent是Flume中产生数据流的地方,一个Agent就是一个JVM。单Agent由Source、Sink和Channel三大组件构成
Source:完成对日志数据的收集,分成 transtion 和 event 打入到Channel之中。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
2 Source
flume有许多类型的Source,见官网用户手册:
http://flume.apache.org/FlumeUserGuide.html#flume-sources
归纳整理一下flume的 agent source列表如下
Source类型 |
说明 |
Avro Source |
支持Avro协议(实际上是Avro RPC),提供一个Avro的接口,需要往设置的地址和端口发送Avro消息,Source就能接收到,如:Log4j Appender通过Avro Source将消息发送到Agent |
Thrift Source |
支持Thrift协议,提供一个Thrift接口,类似Avro |
Exec Source |
Source启动的时候会运行一个设置的UNIX命令(比如 cat file),该命令会不断地往标准输出(stdout)输出数据,这些数据就会被打包成Event,进行处理 |
JMS Source |
从JMS系统(消息、主题)中读取数据,类似ActiveMQ |
Spooling Directory Source |
监听某个目录,该目录有新文件出现时,把文件的内容打包成Event,进行处理 |
Netcat Source |
监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
Sequence Generator Source |
序列生成器数据源,生产序列数据 |
Syslog Sources |
读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source |
基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources |
兼容老的Flume OG中Source(0.9.x版本) |
自定义Source |
使用者通过实现Flume提供的接口来定制满足需求的Source。 |
对于直接读取文件Source, 主要有两种方式:
ü Exec source
可通过写Unix command的方式组织数据,最常用的就是tail -F [file]。
可以实现实时传输,但在flume不运行和脚本错误时,会丢数据,也不支持断点续传功能。因为没有记录上次文件读到的位置,从而没办法知道,下次再读时,从什么地方开始读。特别是在日志文件一直在增加的时候。flume的source挂了。等flume的source再次开启的这段时间内,增加的日志内容,就没办法被source读取到了。不过flume有一个execStream的扩展,可以自己写一个监控日志增加情况,把增加的日志,通过自己写的工具把增加的内容,传送给flume的node。再传送给sink的node。要是能在tail类的source中能支持,在node挂掉这段时间的内容,等下次node开启后在继续传送,那就更完美了。
ü Spooling Directory Source
SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来,可实现准实时。需要注意两点:
1、拷贝到spool目录下的文件不可以再打开编辑。
2、spool目录下不可包含相应的子目录。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
注:ExecSource,SpoolSource对比
ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。
2 Channel
当前有几个 Channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 Channel。
v Memory Channel 可以实现高速的吞吐,但是无法保证数据的完整性。
v Memory Recover Channel 在官方文档的建议上已经建义使用File Channel来替换。
v File Channel保证数据的完整性与一致性。在具体配置File Channel时,建议File Channel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
File Channel 是一个持久化的隧道(Channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会造成数据丢失。Memory Channel 是一个不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 Java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,而 File Channel 这方面是它的优势,只要磁盘空间足够,它就可以将所有事件数据存储到磁盘上。
Flume Channel 支持的类型:
Channel类型 |
说明 |
Memory Channel |
Event数据存储在内存中 |
JDBC Channel |
Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel |
Event数据存储在磁盘文件中 |
Spillable Memory Channel |
Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel |
测试用途 |
Custom Channel |
自定义Channel实现 |
2 Sink
Sink在设置存储数据时,可以向文件系统中,数据库中,Hadoop中储数据,在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
Flume Sink支持的类型
Sink类型 |
说明 |
HDFS Sink |
数据写入HDFS |
Logger Sink |
数据写入日志文件 |
Avro Sink |
数据被转换成Avro Event,然后发送到配置的RPC端口上 |
Thrift Sink |
数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
IRC Sink |
数据在IRC上进行回放 |
File Roll Sink |
存储数据到本地文件系统 |
Null Sink |
丢弃到所有数据 |
HBase Sink |
数据写入HBase数据库 |
Morphline Solr Sink |
数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink |
数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink |
写数据到Kite Dataset,试验性质的 |
Custom Sink |
自定义Sink实现 |
Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个Agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes
Collector
Flume NG中已经没有Collector的概念了,Collector的作用是将多个Agent的数据汇总后,加载到Storage中。
2.2.3 Storage
Storage是存储系统,可以是一个普通File,也可以是HDFS,HIVE,HBase等。
2.2.4 Master
针对于OG版本。
Master是管理协调Agent和Collector的配置等信息,是Flume集群的控制器。
在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径
对于Agent数据流配置就是从哪得到数据,把数据发送到哪个Collector。
对于Collector是接收Agent发过来的数据,把数据发送到指定的目标机器上。
五.flume特性介绍
(1) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。)
Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送)
Best effort(数据发送到接收方后,不会进行确认)。
(2) 可扩展性
Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。
(3) 可管理性
所有agent和colletor由master统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
(4) 功能可扩展性
用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。
学习总结:
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送
Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送
Best effort:数据发送到接收方后,不会进行确认。
————————————-++++++++++++++++——————————-
一. flume环境搭建安装部分介绍
flume官网: http://flume.apache.org/download.html
建议大家使用1.6版本的,1.7版本目前存在一些小问题需要解决。后续官方可能会修复。
集群中设置3个节点,每台都安装flume.
下载后解压: tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/modules/flume
本次默认已经安装hadoop和jdk。所以忽略安装步骤。
修改配置文件: cp flume-env.sh.template flume-env.sh
vim flume-env.sh
export JAVA_HOME=/usr/local/java_1.7.0_25
设置系统环境变量:
export FLUME_HOME=/opt/modules/flume
export PATH=$PATH:$FLUME_HOME/bin:
保存退出后。source /etc/profile 立即生效。
测试: 在终端输入 flume-ng version
出现以上提示说明flume搭建成功。
可以把flume分发到其他节点。
scp -r /opt/modules/flume/* root@slave1:/opt/modlues/flume
scp -r /opt/modules/flume/* root@slave2:/opt/modlues/flume
二. 采集数据测试。
1) Avro
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
创建agent配置文件
# > vi /opt/modules/flume/conf/avro.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
对以上内容解释:
指定名称:a1是我们要启动的Agent名字
a1.sources = r1命名Agent的sources为r1
a1.sinks = k1命名Agent的sinks为k1
a1.channels = c1命名Agent的channels 为c1
# Describe configure the source
a1.sources.r1.type = avro指定r1的类型为AVRO
a1.sources.r1.bind = 0.0.0.0 将Source与IP地址绑定(这里指本机)
a1.sources.r1.port = 4141指定通讯端口为4141
# Describe the sink
a1.sinks.k1.type = logger指定k1的类型为Logger(不产生实体文件,只在控制台显示)
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
配置说明
指定Channel的类型为Memory
设置Channel的最大存储event数量为1000
每次最大可以source中拿到或者送到sink中的event数量也是100
这里还可以设置Channel的其他属性:
a1.channels.c1.keep-alive=1000event添加到通道中或者移出的允许时间(秒)
a1.channels.c1.byteCapacity = 800000event的字节量的限制,只包括eventbody
a1.channels.c1.byteCapacityBufferPercentage = 20
event的缓存比例为20%(800000的20%),即event的最大字节量为800000*120%
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将source、sink分别与Channel c1绑定
启动flume agent a1
# > flume-ng agent -c . -f /home/bigdata/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
-c:使用配置文件所在目录(这里指默认路径,即$FLUME_HOME/conf)
-f:flume定义组件的配置文件
-n:启动Agent的名称,该名称在组件配置文件中定义
-Dflume.root.logger:flume自身运行状态的日志,按需配置,详细信息,控制台打印
在节点2启动flume 进程
flume-ng agent -c . -f /opt/modules/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console //启动命令
创建文件发送过去
echo “china 51cto”>>/home/avro_log
flume-ng avro-client -c . -H master -p 4141 -F /home/avro_log
注:Flume框架对Hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将Hadoop和zookeeper服务也启动。
就先讲解一个模式吧。其他的数据采集方式后续分享。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/197023.html