一、Flink环境搭建
1.1 flink部署方式
Flink可以选择的部署方式有:
Local、Standalone(资源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。
我们主要对Standalone模式和Yarn模式下的Flink集群部署进行分析。
Standalone模式常用于单机进行程序测试,Yarn模式常用于实际线上生产环境。
1.2 集群规划
1、集群规划
节点名称 | master(jobManager) | worker(taskManager) | zookeeper |
---|---|---|---|
bigdata11 | √ | √ | |
bigdata21 | √ | √ | √ |
bigdata31 | √ | √ |
(注:zookeeper只是用于实现master HA的必要组件,如果不需要master HA,则zookeeper可以去掉。)
2、软件版本
jdk | 1.8 |
---|---|
scala | 2.11.8 |
hadoop | 2.8 |
zookeeper | 3.4.10 |
flink | 1.6.1 |
3、基础环境
安装好jdk、scala、hadoop(hdfs+yarn都要部署好)、zookeeper,部署方法看之前的相关文章。而且要注意的是,节点之间要配置好ssh秘钥免登陆。
1.3 Standalone模式安装
1、解压程序:
tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/修改配置文件
2、修改配置文件
配置master节点地址:
[root@bigdata11 conf]$ sudo vi masters
bigdata11:8081
配置worker节点地址:
[root@bigdata11 conf]$ sudo vi slaves
bigdata12
bigdata13
修改flink工作参数:
[root@bigdata11 conf]$ sudo vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2 //52行
jobmanager.rpc.address: bigdata11 //33行 指定jobmanager 的rpc地址
可选配置:
•每个JobManager(jobmanager.heap.mb)的可用内存量,
•每个TaskManager(taskmanager.heap.mb)的可用内存量,
•每台机器(taskManager)的可用的slot数量(taskmanager.numberOfTaskSlots),
•每个job的并行度(parallelism.default)
•临时目录(taskmanager.tmp.dirs)
3、配置环境变量
vim /etc/profile.d/flink.sh
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
然后source /etc/profile.d/flink.sh 启用环境变量
4、拷贝配置好的/opt/module/flink-1.6.1到其他节点
使用scp或者rsync
scp -r /opt/module/flink-1.6.1 bigdata12:`pwd`
scp -r /opt/module/flink-1.6.1 bigdata13:`pwd`
同时配置好其他两台的环境变量
5、启动flink集群
[root@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata11.
Starting taskexecutor daemon on host bigdata12.
Starting taskexecutor daemon on host bigdata13.
使用jps可以在对应的节点上查看对应的进程
StandloneSessionClusterEntrypoint 这是jobmanager进程
TaskManagerRunner 这是taskmanager进程
6、web UI 查看
http://bigdata11:8081
7、运行测试任务
flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input /opt/module/datas/word.txt --output /tmp/word.output
8、增减节点到集群中
增加/减少jobmanager节点:
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
增加/减少taskmanager节点(需要到当前节点去启动):
bin/taskmanager.sh start|start-foreground|stop|stop-all
1.4 standalone模式jobManager HA
首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。
对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。
1、修改配置文件
conf/flink-conf.yaml
注释掉
#jobmanager.rpc.address: bigdata11
修改下面的配置
high-availability: zookeeper //73行 指定高可用方式为zookeeper
#指定高可用模式中zookeeper的地址列表 //88行
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181
#指定将jobmanager状态数据持久化保存到hdfs中
high-availability.storageDir: hdfs:///flink/ha/
#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) //没有
high-availability.zookeeper.path.root: /flink
#根ZooKeeper节点,在该节点下放置所有集群节点(推荐),这是集群节点信息保存位置
high-availability.cluster-id:/flinkCluster
#自定义集群(推荐),这里是检查点和保存点的配置,保存在hdfs中,非必须
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
conf/masters
将主备jobmanager地址都写到该配置文件中。
bigdata11:8081
bigdata12:8081
conf/zoo.cfg
server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888
修改完后同步配置到其他所有节点中。
2、启动集群
先启动好zookeeper服务。
然后启动hdfs服务。
最后启动flink集群。 start-cluster.sh
1.5 yarn模式安装
部署步骤和上面standalone基本一样,这里不重复。还要添加以下配置:
配置好hadoop(hdfs和yarn)环境,同时配置好HADOOP_HOME这个环境变量。
接着在yarn下启动jobmanager和taskmanager。
/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d
其中:
-n(--container):TaskManager的数量。
-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行
会自动根据 conf/ 下的配置文件启动对应的jobmanager和taskmanager的
启动完成后,可以到yarn 的web页面查看到刚才提交会话任务:
http://bigdata11:8088
同时可以在提交session的节点上使用jps查看对应的进程:
YarnSessionClusterEntrypoint 这个就是刚刚提交的yarn-session维持的session进程
提交测试任务到yarn中的flink集群运行
./bin/flink run ./examples/batch/WordCount.jar --input 输出数据路径
--output 输出数据路径
可以手动使用 -m jobManagerAddress 指定jobmanager地址,但是flink client可以自动根据flink的配置文件获取到jobmanager地址,所以可以不用指定
提交任务之后,可以在yarn的web页面中查看到相关的任务信息
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/198352.html