AirFlow 常见问题

@

文章目录[隐藏]

  • 修改前: self.pickle = dag
  • AirFlow 常见问题

    安装问题

    1、安装出现ERROR “python setup.py xxx” 。
    问题:

    第一需要你更新 pip 版本需要使用'pip install --upgrade pip' command.

    第二是 setuptools 版本太旧,所以出现以下问题Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-G9yO9Z/tldr/,也是需要你更新

    
    File "/tmp/pip-build-G9yO9Z/tldr/setuptools_scm-3.3.3-py2.7.egg/setuptools_scm/integration.py", line 9, in version_keyword
    File "/tmp/pip-build-G9yO9Z/tldr/setuptools_scm-3.3.3-py2.7.egg/setuptools_scm/version.py", line 66, in _warn_if_setuptools_outdated
    setuptools_scm.version.SetuptoolsOutdatedWarning: your setuptools is too old (<12)
    ----------------------------------------

    Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-G9yO9Z/tldr/
    You are using pip version 8.1.2, however version 19.2.1 is available.
    You should consider upgrading via the 'pip install --upgrade pip' command.

    ##### 解决方法:
    > (一)使用“pip install—upgrade pip”命令进行pip版本升级。
    >          [xiaokang@localhost ~]$ sudo pip install --upgrade pip
    > (二)使用“ pip install --upgrade setuptools”命令进行setuptools 版本升级。
    >          [xiaokang@localhost ~]$ sudo pip install --upgrade setuptools
    >  解决完以上问题你就可以成功安装上之前要安装的软件了
    
    #### 2、ERROR: Cannot uninstall 'enum34' 。
    ##### 问题:
    ```python
            在安装Airflow的时候,出现如下错误:
            ERROR: Cannot uninstall 'enum34'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
    解决方法:

    sudo pip install --ignore-installed enum34
    当出现其他无法升级的错误时,可以采用以下命令格式进行强制升级:
       sudo pip install --ignore-installed +模块名

    3、安装软件报错,提示软件包找不到
    问题:ERROR: Command errored out with exit status 1:

    AirFlow 常见问题

        ERROR: Command errored out with exit status 1:
         command: /usr/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-oZ2zgF/flask-appbuilder/setup.py'"'"'; __file__='"'"'/tmp/pip-install-oZ2zgF/flask-appbuilder/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'/r/n'"'"', '"'"'/n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-install-oZ2zgF/flask-appbuilder/pip-egg-info
             cwd: /tmp/pip-install-oZ2zgF/flask-appbuilder/
        Complete output (3 lines):
        /usr/lib64/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'long_description_content_type'
          warnings.warn(msg)
        error in Flask-AppBuilder setup command: 'install_requires' must be a string or list of strings containing valid project/version requirement specifiers
        ----------------------------------------
    ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
    
    解决方法:

    检查安装命令,一般此类问题是因为安装包找不到才会出现的错误。

    4、提示找不到Python.h 这个文件或目录
    问题:src/spt_python.h:14:20: fatal error: Python.h: No such file or directory

    AirFlow 常见问题

        ERROR: Command errored out with exit status 1:
         command: /usr/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"'; __file__='"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'/r/n'"'"', '"'"'/n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-XTav9_/install-record.txt --single-version-externally-managed --compile
             cwd: /tmp/pip-install-YmiKzY/setproctitle/
        Complete output (15 lines):
        running install
        running build
        running build_ext
        building 'setproctitle' extension
        creating build
        creating build/temp.linux-x86_64-2.7
        creating build/temp.linux-x86_64-2.7/src
        gcc -pthread -fno-strict-aliasing -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -D_GNU_SOURCE -fPIC -fwrapv -DNDEBUG -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -D_GNU_SOURCE -fPIC -fwrapv -fPIC -DHAVE_SYS_PRCTL_H=1 -DSPT_VERSION=1.1.10 -I/usr/include/python2.7 -c src/setproctitle.c -o build/temp.linux-x86_64-2.7/src/setproctitle.o
        In file included from src/spt.h:15:0,
                         from src/setproctitle.c:14:
        src/spt_python.h:14:20: fatal error: Python.h: No such file or directory
         #include <Python.h>
                            ^
        compilation terminated.
        error: command 'gcc' failed with exit status 1
        ----------------------------------------
    ERROR: Command errored out with exit status 1: /usr/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"'; __file__='"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'/r/n'"'"', '"'"'/n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-XTav9_/install-record.txt --single-version-externally-managed --compile Check the logs for full command output.
    解决方法:

    因为缺少python的开发包,yum install python-devel 安装即可解决

     
     

    dag 问题

    1、bash_command='/root/touch.sh' 执行命令错误 。
    问题:

    AirFlow 常见问题

    [2019-12-19 15:15:15,523] {taskinstance.py:1058} ERROR - bash /root/touch.sh
    Traceback (most recent call last):
      File "/usr/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 915, in _run_raw_task
        self.render_templates(context=context)
      File "/usr/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 1267, in render_templates
        self.task.render_template_fields(context)
      File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 689, in render_template_fields
        self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
      File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 696, in _do_render_template_fields
        rendered_content = self.render_template(content, context, jinja_env, seen_oids)
      File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 723, in render_template
        return jinja_env.get_template(content).render(**context)
      File "/usr/lib64/python2.7/site-packages/jinja2/environment.py", line 830, in get_template
        return self._load_template(name, self.make_globals(globals))
      File "/usr/lib64/python2.7/site-packages/jinja2/environment.py", line 804, in _load_template
        template = self.loader.load(self, name, globals)
      File "/usr/lib64/python2.7/site-packages/jinja2/loaders.py", line 113, in load
        source, filename, uptodate = self.get_source(environment, name)
      File "/usr/lib64/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
        raise TemplateNotFound(template)
    TemplateNotFound: bash /root/touch.sh
    解决方法:在执行的命令后面多加一个空格

      由于airflow使用了jinja2作为模板引擎导致的一个陷阱,当使用bash命令的时候,尾部必须加一个空格

    airflow

    1、启动worker 时报错
    问题:
    Running a worker with superuser privileges when the
    worker accepts messages serialized with pickle is a very bad idea!
    
    If you really want to continue then you have to set the C_FORCE_ROOT
    environment variable (but please think about this before you do).
    

    AirFlow 常见问题

    解决方法:

    在/etc/profile 内添加 export C_FORCE_ROOT="True"

    2、airflow怎么批量unpause大量的dag任务

    普通少量任务可以通过命令airflow unpause dag_id命令来启动,或者在web界面点击启动按钮实现,但是当任务过多的时候,一个个任务去启动就比较麻烦。其实dag信息是存储在数据库中的,可以通过批量修改数据库信息来达到批量启动dag任务的效果。假如是用mysql作为sql_alchemy_conn,那么只需要登录airflow数据库,然后更新表dag的is_paused字段为0即可启动dag任务。

    示例: update dag set is_paused = 0 where dag_id like "benchmark%";

    3、airflow的scheduler进程在执行一个任务后就挂起进入假死状态

    出现这个情况的一般原因是scheduler调度器生成了任务,但是无法发布出去。而日志中又没有什么错误信息。

    可能原因是Borker连接依赖库没安装:
    如果是redis作为broker则执行pip install apache‐airflow[redis]
    如果是rabbitmq作为broker则执行pip install apache-airflow[rabbitmq]
    还有要排查scheduler节点是否能正常访问rabbitmq。

    4、当定义的dag文件过多的时候,airflow的scheduler节点运行效率缓慢

    airflow的scheduler默认是起两个线程,可以通过修改配置文件airflow.cfg改进:

    [scheduler]
    # The scheduler can run multiple threads in parallel to schedule dags.
    # This defines how many threads will run.
    #默认是2这里改为100
    max_threads = 100
    5、airflow日志级别更改
    vi airflow.cfg
    
    [core]
    #logging_level = INFO
    logging_level = WARNING

    NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
    如果把log的级别设置为INFO, 那么小于INFO级别的日志都不输出, 大于等于INFO级别的日志都输出。也就是说,日志级别越高,打印的日志越不详细。默认日志级别为WARNING。
    注意: 如果将logging_level改为WARNING或以上级别,则不仅仅是日志,命令行输出明细也会同样受到影响,也只会输出大于等于指定级别的信息,所以如果命令行输出信息不全且系统无错误日志输出,那么说明是日志级别过高导致的。

    6、AirFlow: jinja2.exceptions.TemplateNotFound

    这是由于airflow使用了jinja2作为模板引擎导致的一个陷阱,当使用bash命令的时候,尾部必须加一个空格:

    • Described here : see below. You need to add a space after the script name in cases where you are directly calling a bash scripts in the bash_command attribute of BashOperator - this is because the Airflow tries to apply a Jinja template to it, which will fail.
    t2 = BashOperator(
    task_id='sleep',
    bash_command="/home/batcher/test.sh", // This fails with `Jinja template not found` error
    #bash_command="/home/batcher/test.sh ", // This works (has a space after)
    dag=dag)
    7、AirFlow: Task is not able to be run

    任务执行一段时间后突然无法执行,后台worker日志显示如下提示:

    [2018-05-25 17:22:05,068] {jobs.py:2508} INFO - Task is not able to be run

    查看任务对应的执行日志:

    cat /home/py/airflow-home/logs/testBashOperator/print_date/2018-05-25T00:00:00/6.log
    ...
    [2018-05-25 17:22:05,067] {models.py:1190} INFO - Dependencies not met for &lt;TaskInstance: testBashOperator.print_date 2018-05-25 00:00:00 [success]&gt;,
    dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.

    根据错误提示,说明依赖任务状态失败,针对这种情况有两种解决办法:

    使用airflow run运行task的时候指定忽略依赖task:

    $ airflow run -A dag_id task_id execution_date

    使用命令airflow clear dag_id进行任务清理:

    $ airflow clear -u testBashOperator
    8、CELERY: PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@xxxx.celery.pidbox' in vhost ''在升级celery 4.x以后使用rabbitmq为broker运行任务抛出如下异常:
    [2018-06-29 09:32:14,622: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, "PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQ
    SZ-L01395.celery.pidbox' in vhost '/': received the value '10000' of type 'signedint' but current is none", (50, 10), 'Queue.declare')
    Traceback (most recent call last):
        File "c:/programdata/anaconda3/lib/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
    .......
        File "c:/programdata/anaconda3/lib/site-packages/amqp/channel.py", line 277, in _on_close
            reply_code, reply_text, (class_id, method_id), ChannelError,
    amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQSZ-L01395.celery.pidbox' in vhost '/'
    : received the value '10000' of type 'signedint' but current is none

    出现该错误的原因一般是因为rabbitmq的客户端和服务端参数不一致导致的,将其参数保持一致即可。
    比如这里提示是x-expires 对应的celery中的配置是control_queue_expires。因此只需要在配置文件中加上control_queue_expires = None即可。

    在celery 3.x中是没有这两项配置的,在4.x中必须保证这两项配置的一致性,不然就会抛出如上的异常。

    我这里遇到的了两个rabbitmq的配置与celery配置的映射关系如下表:

    rabbitmq celery4.x
    x-expires control_queue_expires
    x-message-ttl control_queue_ttl
    9、CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend

    celery升级到4.x之后运行抛出如下异常:

    /anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
            The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
        alternative='Please use RPC backend or a persistent backend.')

    原因解析:
    在celery 4.0中 rabbitmq 配置result_backbend方式变了:
    以前是跟broker一样:result_backend = 'amqp://guest:guest@localhost:5672//'
    现在对应的是rpc配置:result_backend = 'rpc://'

    参考链接:http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix

    10、CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)

    windows上运行celery 4.x抛出以下错误:

    [2018-07-02 10:54:17,516: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
    Traceback (most recent call last):
        ......
        tasks, accept, hostname = _loc
    ValueError: not enough values to unpack (expected 3, got 0)

    celery 4.x暂时不支持windows平台,如果为了调试目的的话,可以通过替换celery的线程池实现以达到在windows平台上运行的目的:

    pip install eventlet
    celery -A &lt;module&gt; worker -l info -P eventlet

    参考链接:
    https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
    https://blog.csdn.net/qq_30242609/article/details/79047660

    11、Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'

    airflow运行中抛出以下异常:

    Traceback (most recent call last):
    File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
    ......
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
    File "/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/base.py", line 307, in get_task_meta
    meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
    [2018-07-04 10:52:14,746] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
    [2018-07-04 10:52:14,746] {celery_executor.py:102} ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'

    这种错误有两种可能原因:

    1. CELERY_RESULT_BACKEND属性没有配置或者配置错误;
    2. celery版本太低,比如airflow 1.9.0要使用celery4.x,所以检查celery版本,保持版本兼容;
    12、airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse

    查看worker日志airflow-worker.err

    airflow.exceptions.AirflowException: dag_id could not be found: bmhttp. Either the dag did not exist or it failed to parse.
    [2018-07-31 17:37:34,191: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[181c78d0-242c-4265-aabe-11d04887f44a] raised unexpected: AirflowException('Celery command failed',)
    Traceback (most recent call last):
    File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 52, in execute_command
    subprocess.check_call(command, shell=True)
    File "/anaconda/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
    raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command 'airflow run bmhttp get_op1 2018-07-26T06:28:00 --local -sd /home/ignite/airflow/dags/BenchMark01.py' returned non-zero exit status 1.

    通过异常日志中的Command信息得知, 调度节点在生成任务消息的时候同时也指定了要执行的脚本的路径(通过ds参数指定),也就是说调度节点(scheduler)和工作节点(worker)相应的dag脚本文件必须置于相同的路径下面,不然就会出现以上错误。

    参考链接:https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found

    13、airlfow 的 REST API调用返回 Airflow 404 = lots of circles

    出现这个错误的原因是因为URL中未提供origin参数,这个参数用于重定向,例如调用airflow的/run接口,可用示例如下所示:

    http://localhost:8080/admin/airflow/run?dag_id=example_hello_world_dag&task_id=sleep_task&execution_date=20180807&ignore_all_deps=true&origin=/admin

    14、Broker与Executor选择

    请务必使用RabbitMQ+CeleryExecutor, 毕竟这个也是Celery官方推荐的做法, 这样就可以使用一些很棒的功能, 比如webui上点击错误的Task然后ReRun

    15、pkg_resources.DistributionNotFound: The 'setuptools==0.9.8' distribution was not found and is required by the application

    AirFlow 常见问题

    pip install distribution

    16、Supervisor

    在使用supervisor的启动worker,server,scheduler的时候, 请务必给配置的supervisor任务加上

    environment=AIRFLOW_HOME=xxxxxxxxxx

    主要原因在于如果你的supervisor是通过调用一个自定义的脚本来运行的, 在启动worker的时候会另外启动一个serve_log服务, 如果没有设置正确的环境变量, serve_log 会在默认的AIRFLOW_HOME里找日志, 导致无法在webui里查看日志

    17、Serve_log

    如果在多个机器上部署了worker, 那么你需要iptables开启那些机器的8793端口, 这样webui才能查看跨机器worker的任务日志

    18、AMPQ库

    celery提供了两种库来实现amqp, 一种是默认的kombu, 另外一个是librabbitmq, 后者是对其c模块的绑定, 在1.8.1版本中, 使用的kombu的时候会出现scheduler自动断掉的问题, 这个应该是其对应版本4.0.2的问题, 当切成librabbitmq的时候, server 与 scheduler运行正常, 但是worker的从来不consume任务, 最后查出原因: Celery4.0.2的协议发生了变化但是librabbitmq还没有对应修改, 解决方法是, 修改源码里的 executors/celery_executor.py文件然后加入参数

    CELERY_TASK_PROTOCOL = 1
    19、RabbitMQ连接卡死

    运行一段时间过后, 由于网络问题导致所有任务都在queued状态, 除非把worker重启才能生效, 查资料有人说是clelery的broker pool有问题, 继续给celery_executor.py加入参数

    BROKER_POOL_LIMIT=0 //不使用连接池

    另外这样只会减少卡死的几率, 最好使用crontab定时重启worker

    20、特定任务只在特殊机器上运行

    可以给DAG中的task指定一个queue, 然后在特定的机器上运行 airflow worker -q=QUEUE_NAME 即可实现

    21、RabbitMQ中的queue数量过多问题

    celery为了让scheduler知道每个task的结果并且知道结果的时间为 O(1) , 那么唯一的解决方式就是给每一个任务创建一个UUID的queue, 默认这个queue的过期时间是1天, 可以通过更改celery_executor.py的参数来调节这个过期时间

    CELERY_TASK_RESULT_EXPIRES = time in seconds
    22、airflow worker 角色不能使用根用户启动

    原因:不能用根用户启动的根本原因,在于airflow的worker直接用的celery,而celery 源码中有参数默认不能使用ROOT启动,否则将报错 .

    C_FORCE_ROOT = os.environ.get('C_FORCE_ROOT', False)
    ROOT_DISALLOWED = """/
    Running a worker with superuser privileges when the
    worker accepts messages serialized with pickle is a very bad idea!
    
    If you really want to continue then you have to set the C_FORCE_ROOT
    environment variable (but please think about this before you do).
    
    User information: uid={uid} euid={euid} gid={gid} egid={egid}
    """
    
    ROOT_DISCOURAGED = """/
    You're running the worker with superuser privileges: this is
    absolutely not recommended!
    
    Please specify a different user using the --uid option.
    
    User information: uid={uid} euid={euid} gid={gid} egid={egid}
    """

    解决方案一:修改airlfow源码,在celery_executor.py中强制设置C_FORCE_ROOT

    from celery import Celery, platforms
    在app = Celery(…)后新增
    platforms.C_FORCE_ROOT = True
    重启即可

    解决方案二:在容器初始化环境变量的时候,设置C_FORCE_ROOT参数,以零侵入的方式解决问题

    强制celery worker运行采用root模式
    export C_FORCE_ROOT=True
    23、docker in docker

    在dags中以docker方式调度任务时,为了container的轻量话,不做重型的docker pull等操作,我们利用了docker cs架构的设计理念,只需要将宿主机的/var/run/docker.sock文件挂载到容器目录下即可 docker in docker 资料 :https://link.zhihu.com/?target=http://wangbaiyuan.cn/docker-in-docker.html#prettyPhoto

    24、多个worker节点进行调度反序列化dag执行的时候,报找不到module的错误

    当时考虑到文件更新的一致性,采用所有worker统一执行master下发的序列化dag的方案,而不依赖worker节点上实际的dag文件,开启这一特性操作如下

    worker节点上: airflow worker -cn=ip@ip -p //-p为开关参数,意思是以master序列化的dag作为执行文件,而不是本地dag目录中的文件
    master节点上: airflow scheduler -p

    错误原因: 远程的worker节点上不存在实际的dag文件,反序列化的时候对于当时在dag中定义的函数或对象找不到module_name
    解决方案一:在所有的worker节点上同时发布dags目录,缺点是dags一致性成问题
    解决方案二:修改源码中序列化与反序列化的逻辑,主体思路还是替换掉不存在的module为main。修改如下:

    
    //models.py 文件,对 class DagPickle(Base) 定义修改
    import dill
    class DagPickle(Base):
    id = Column(Integer, primary_key=True)
    # 修改前: pickle = Column(PickleType(pickler=dill))
    pickle = Column(LargeBinary)
    created_dttm = Column(UtcDateTime, default=timezone.utcnow)
    pickle_hash = Column(Text)

    tablename = "dag_pickle"
    def init(self, dag):
    self.dag_id = dag.dag_id
    if hasattr(dag, 'template_env'):
    dag.template_env = None
    self.pickle_hash = hash(dag)
    raw = dill.dumps(dag)

    修改前: self.pickle = dag

    reg_str = 'unusualprefix/w*{0}'.format(dag.dag_id)
    result = re.sub(str.encode(reg_str), b'main', raw)
    self.pickle =result

    //cli.py 文件反序列化逻辑 run(args, dag=None) 函数
    // 直接通过dill来反序列化二进制文件,而不是通过PickleType 的result_processor做中转
    修改前: dag = dag_pickle.pickle
    修改后:dag = dill.loads(dag_pickle.pickle)

    >  解决方案三:源码零侵入,使用python的types.FunctionType重新创建一个不带module的function,这样序列化与反序列化的时候不会有问题

    new_func = types.FunctionType((lambda df: df.iloc[:, 0].size == xx).code, {})

    
    #### 25、在master节点上,通过webserver无法查看远程执行的任务日志
    >  原因:由于airflow在master查看task执行日志是通过各个节点的http服务获取的,但是存入task_instance表中的host_name不是ip,可见获取hostname的方式有问题.
    >  解决方案:修改airflow/utils/net.py 中get_hostname函数,添加优先获取环境变量中设置的hostname的逻辑
    ```python
    //models.py TaskInstance
    self.hostname = get_hostname()
    //net.py 在get_hostname里面加入一个获取环境变量的逻辑
    import os
    def get_hostname():
    """
    Fetch the hostname using the callable from the config or using
    `socket.getfqdn` as a fallback.
    """
    # 尝试获取环境变量
    if 'AIRFLOW_HOST_NAME' in os.environ:
    return os.environ['AIRFLOW_HOST_NAME']
    # First we attempt to fetch the callable path from the config.
    try:
    callable_path = conf.get('core', 'hostname_callable')
    except AirflowConfigException:
    callable_path = None
    
    # Then we handle the case when the config is missing or empty. This is the
    # default behavior.
    if not callable_path:
    return socket.getfqdn()
    
    # Since we have a callable path, we try to import and run it next.
    module_path, attr_name = callable_path.split(':')
    module = importlib.import_module(module_path)
    callable = getattr(module, attr_name)
    return callable()
    

    原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/183502.html

    (0)
    上一篇 2021年11月2日 03:41
    下一篇 2021年11月2日 03:45

    相关推荐

    发表回复

    登录后才能评论