Flink On YARN


0. 环境准备

本实验基于以下 8 台测试机器进行:

IP hostname
10.4.79.90 hadoop-1
10.4.79.8 hadoop-2
10.4.79.6 hadoop-3
10.4.79.58 hadoop-4
10.4.79.38 hadoop-5
10.4.79.96 hadoop-6
10.4.79.62 hadoop-7
10.4.79.92 hadoop-8

首先确认每个机器都安装了如下软件:

  • JAVA >= 1.8.x
  • SSH,并确保所有集群节点之间可互相 SSH免密登录

为集群每个节点配置 hostname:vi /etc/hosts

10.4.79.90 hadoop-1
10.4.79.8  hadoop-2
10.4.79.6  hadoop-3
10.4.79.58 hadoop-4
10.4.79.38 hadoop-5
10.4.79.96 hadoop-6
10.4.79.62 hadoop-7
10.4.79.92 hadoop-8

1. YARN 环境搭建

YARN 环境搭建请移步至 hadoop 环境搭建

1. 准备

a. 测试 Yarn 环境是否已经准备好接受一个 Flink Application 了: yarn top

Flink On YARN

b. 下载 Flink 并解压缩

c. 系统环境添加配置项:

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

2. 以 Application 模式提交任务

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

启动成功后,会返回一个 JobID,如下图所示:

flink on yarn

启动日志中有一部分关键信息:

The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via yarn.scheduler.minimum-allocation-mb). The extra 448 MB may not be used by Flink.
The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via yarn.scheduler.minimum-allocation-mb). The extra 320 MB may not be used by Flink.

因为我们前文在 yarn-site.xml 中配置 yarn 的每个任务最小内存分配单元(yarn.scheduler.minimum-allocation-mb)是 1024MB,而我们在 Flink/conf/flink-conf.yaml 中配置的 jobmanager 内存使用大小(jobmanager.memory.process.size:)是 1600m,所以 yarn 会分配给 jobmanager 共计 2×1024=2048MB,Flink 只使用了其中的 1600MB,剩余 448MB,这些空间就浪费了。

同理,taskmanager 的内存大小配置是 1728MB,yarn 会分配给 taskmanager 共计 2×1024=2048MB,Flink 只使用了其中的 1728MB,剩余的 320MB 空间就浪费了。

因此我们将 Flink 的 JobManager (jobmanager.memory.process.size) 和 TaskManager (taskmanager.memory.process.size) 大小都配置为 2048M。

重新提交任务,启动成功后,可以在 yarn 管理页面看到正在运行的 Application:

yarn-application

点击任务 ID 跳转至任务详情页:

Flink On YARN

点击 ApplicationMaster 就跳转到了我们熟悉的 Flink 管理页面:

Flink On YARN

点击 AttempID 可以查看任务具体运行在哪个节点:

Flink On YARN

可以看到刚刚提交的 Flink 任务占用了两个 Yarn Container,分别在 hadoop-3 和 hadoop-7,一个 Container 用于运行 JobManager,一个 Container 用于运行 TaskManager。

3. 日志查看

登陆到 hadoop-3 查看后台日志:

cd /opt/hadoop-2.10.0/logs/userlogs/{applicationid}/{containerid}

Flink On YARN

可以看出 hadoop-3 是一个 TaskManager。

登陆到 hadoop-7 查看后台日志:

cd /opt/hadoop-2.10.0/logs/userlogs/{applicationid}/{containerid}

Flink On YARN

可以看出 hadoop-7 是一个 JobManager。

4. 日志归档

在 flink/conf 目录下已经提供了 log4j.properties 和 logback.xml,说明可以 log4j 和 logback 两种方案任选其一。
如果选择 logback 方案,需要移除(重命名)log4j.properties。

另外 官方文档 中还特别提示:

Flink On YARN

5. 遇到的问题:

a. 启动任务报错:

Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.

启动任务报错

解决办法: 在 flink 配置文件 flink-conf.yaml 设置禁用 classloader.check-leaked-classloader

classloader.check-leaked-classloader: false

b. 无法提交第二个任务,状态一直处于 ACCEPTED:

Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

无法提交第二个任务

似乎是资源不够用了。

  • 查看 CPU 和内存资源占用情况

    Total Resources Used Resources
    <memory:64 GB, vCores:32> <memory:4 GB, vCores:2>

    CPU 和内存资源是充足的。

  • 查看调度器的使用情况

    可以看到,集群中使用的是 Capacity Scheduler 调度器,也就是所谓的容量调度,这种方案更适合多租户安全地共享大型集群,以便在分配的容量限制下及时分配资源。采用队列的概念,任务提交到队列,队列可以设置资源的占比,并且支持层级队列、访问控制、用户限制、预定等等配置。但是,对于资源的分配占比调优需要更多的经验处理。但它不会出现在使用 FIFO Scheduler 时会出现的有大任务独占资源,会导致其他任务一直处于 pending 状态的问题。

  • 查看任务队列的使用情况

    Flink On YARN

    从上图中可以看出用户队列没有做租户划分,用的都是 default 队列,从图中可以看出使用的容量也只有 6.3%,队列中最多可存放 10000 个 application,而实际的远远少于 10000,貎似这里也看不出来什么问题。

    但是有一处限制需要特别注意:Configured Max Application Master Limit,这是集群中可用于运行 application master 的资源比例上限,通常用于限制并发运行的应用程序数目,默认值为 10%。

    每个 Flink Application 拥有一个 JobManager,每个 JobManager 分配 2GB 内存,集群中共有 64GB 内存,64 x 0.1 = 6.4GB,因此最多可以提交 3 个 Flink Application,提交第 4 个的时候会超过资源限制无法提交!

  • 解决验证

    修改 hadoop/etc/hadoop/capacity-scheduler.xml

    <property>
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>0.5</value>
    </property>
    

    按照这个配置,最多可以运行 64GB x 0.5 / 2GB = 16 个 Flink Application。

    修改完成后,刷新一下 yarn 队列:

    yarn rmadmin -refreshQueues
    

    继续提交任务,成功!

参考

  1. 官方文档 – Flink On YARN
  2. 官方文档 – Flink Logging
  3. flink on yarn 模式下提示yarn资源不足问题分析

原创文章,作者:,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/269388.html

(0)
上一篇 2022年6月21日 11:16
下一篇 2022年6月21日 11:22

相关推荐

发表回复

登录后才能评论