admin管理员组

文章数量:1576340

文章目录

  • 1.Scheduler的启动和停止命令
    • 1.1 Scheduler启动命令
    • 1.2 Scheduler停止命令
  • 2.Scheduler程序源码
    • 2.1 cli.scheduler(): 接受命令行中的airflow scheduler命令
    • 2.2 BaseJob.run(): 向job表中新增SchdulerJob记录并调用子类的处理逻辑
    • 2.3 SchdulerJob._execute(): SchdulerJob的具体执行逻辑
      • 2.3.1 list_py_file_paths(self.subdir): 找到指定路径下的dag文件
      • 2.3.2 创建DagFileProcessorAgent来解析找到的dag文件
      • 2.3.3 SchdulerJob._execute_helper(): Schduler程序循环的主逻辑
        • 2.3.3.1 self.executor.start(): 启动任务执行器
        • 2.3.3.1 self.reset_state_for_orphaned_tasks(): scheduler启动之后重置给定状态的tis
        • 2.3.3.2 self.processor_agent.start(): 启动 DagFileProcessorManager 开始循环解析dag文件
        • 2.3.3.3 核心代码Scheduler程序的while循坏
          • 2.3.3.3.1 self._get_simple_dags(): 收集dag文件的解析结果
          • 2.3.3.3.2 SimpleDagBag(simple_dags): 将收集到的所有的simple_dags包装成SimpleDagBag
          • 2.3.3.3.3 self._validate_and_run_task_instances: 验证并执行tis
          • 2.3.3.3.4 周期的执行SchedulerJob的心跳方法
          • 2.3.3.3.5 self._processor_poll_interval: 轮训间隔时间
          • 2.3.3.3.6 self.num_runs: while循环终止条件dag文件达到指定的处理次数
          • 2.3.3.3.7 设置while循环的最小轮训时间
        • 2.3.3.4 self.processor_agent.terminate(): 向DagFileProcessorManager发送终止信号
        • 2.3.3.5 models.DAG.deactivate_stale_dags(execute_start_time):
        • 2.3.3.6 self.executor.end(): 结束executor
      • 2.3.4 self.processor_agent.end():结束DagFileProcessorManager

1.Scheduler的启动和停止命令

1.1 Scheduler启动命令

对于Airflow的Scheduler我们一般会使用如下命令启动:

airflow scheduler \
--pid /data/bdetl/airflow/pids/airflow-scheduler.pid \
--stdout /data/bdetl/logs/airflow/airflow-scheduler.out \
--stderr /data/bdetl/logs/airflow/airflow-scheduler.out \
-l /data/bdetl/logs/airflow/airflow-scheduler.log \
-D

更多参数的可以参考Scheduler参数:

参数示意
-sd, --subdir从指定的路径中查找dags文件。默认为’[AIRFLOW_HOME]/dags’,其中[AIRFLOW_HOME]是我们在’airflow.cfg’中为’AIRFLOW_HOME’设置的值。
-r, --run-duration设置退出前Scheduler程序的循环执行的时间(单位:秒)。
-n, --num_runs设置退出Scheduler程序前,所有的dag文件被解析执行的次数。
-p, --do_pickle是否将DAG对象以序列化的方式发送给worker节点执行。

1.2 Scheduler停止命令

cat /data/bdetl/airflow/pids/airflow-scheduler.pid | xargs kill -15

执行如上命令后,会杀死scheduler进程,并清除airflow-scheduler.pid文件。

2.Scheduler程序源码

如下文章中:

ti表示task_instance,即任务实例;

tis表示task_instances;

代码是基于airflow1.10.11版本。

2.1 cli.scheduler(): 接受命令行中的airflow scheduler命令

根据指定的参数,生成一个SchedulerJob,再执行job的run方法。

cli.scheduler()

@cli_utils.action_logging
def scheduler(args):
    py2_deprecation_waring()
    print(settings.HEADER)
    # 生成一个SchedulerJob
    job = jobs.SchedulerJob(
        dag_id=args.dag_id,
        subdir=process_subdir(args.subdir),
        run_duration=args.run_duration,
        num_runs=args.num_runs,
        do_pickle=args.do_pickle)
    # daemon模式
    if args.daemon:
        # 设置pid以及日志路径
        pid, stdout, stderr, log_file = setup_locations("scheduler",
                                                        args.pid,
                                                        args.stdout,
                                                        args.stderr,
                                                        args.log_file)
        handle = setup_logging(log_file)
        stdout = open(stdout, 'w+')
        stderr = open(stderr, 'w+')

        ctx = daemon.DaemonContext(
            pidfile=TimeoutPIDLockFile(pid, -1),
            files_preserve=[handle],
            stdout=stdout,
            stderr=stderr,
        )
        # 执行schedulerJob的run方法
        with ctx:
            job.run()

        stdout.close()
        stderr.close()
    else:
        signal.signal(signal.SIGINT, sigint_handler)
        signal.signal(signal.SIGTERM, sigint_handler)
        signal.signal(signal.SIGQUIT, sigquit_handler)
        job.run()

2.2 BaseJob.run(): 向job表中新增SchdulerJob记录并调用子类的处理逻辑

执行上述的job.run()方法之后,会执行SchdulerJob父类的BaseJob的run方法:

BaseJob.run()

    def run(self):
        Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
        # Adding an entry in the DB
        with create_session() as session:
            self.state = State.RUNNING
            # 往db中添加一条running的schdulerJob记录
            session.add(self)
            session.commit()
            id_ = self.id
            make_transient(self)
            self.id = id_

            try:
                # 执行子类的实现的_execute()方法
                self._execute()
                # In case of max runs or max duration
                self.state = State.SUCCESS
            except SystemExit:
                # In case of ^C or SIGTERM
                self.state = State.SUCCESS
            except Exception:
                self.state = State.FAILED
                raise
            finally:
                # job执行完之后,填充end_date并更新记录
                self.end_date = timezone.utcnow()
                session.merge(self)
                session.commit()

        Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)

如代码所示,该方法主要会在job表中新建一条scheduler job的记录:

  • 如果_execute()方法(包含一个while循环)正常执行结束,则SchedulerJob的state为SUCCESS;
  • 如果执行_execute()过程中,手动结束程序(ctrl-c or kill -15 pid),则SchedulerJob的state为SUCCESS;
  • 如果执行_execute()过程中抛出异常,则SchedulerJob的state为FAILED;
  • 最后添加SchedulerJob的end_date,并更新db中的记录。

2.3 SchdulerJob._execute(): SchdulerJob的具体执行逻辑

执行上述self._execute()会跳转到子类的如下方法:

SchdulerJob._execute()

    def _execute(self):
        self.log.info("Starting the scheduler")

        # DAGs can be pickled for easier remote execution by some executors
        pickle_dags = False
        if self.do_pickle and self.executor.__class__ not in \
                (executors.LocalExecutor, executors.SequentialExecutor):
            pickle_dags = True

        self.log.info("Running execute loop for %s seconds", self.run_duration)
        self.log.info("Processing each file at most %s times", self.num_runs)

        # Build up a list of Python files that could contain DAGs
        self.log.info("Searching for files in %s", self.subdir)
        # 根据指定的self.subdir路径,查找dag文件
        known_file_paths = list_py_file_paths(self.subdir)
        self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

        # When using sqlite, we do not use async_mode
        # so the scheduler job and DAG parser don't access the DB at the same time.
        async_mode = not self.using_sqlite

        # AIRFLOW SETTINGS:处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程
        processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
        processor_timeout = timedelta(seconds=processor_timeout_seconds)

        """
        新建一个file processor agent:
          dag_directory:默认的dag文件路径或用户指定的dags文件路径self.subdir
          file_paths:dags文件夹下的dag文件路径list
          max_runs:scheduler解析dag文件的次数,默认为-1,表示一直解析
          processor_factory:用于创建DagFileProcessor进程(AbstractDagFileProcessor子类)
          processor_timeout:DagFileProcessor进程超时时间
          async_mode:是否使用异步模式启动DagFileProcessorManager,如果db不是sqlite,则默认使用异步模式
        """
        self.processor_agent = DagFileProcessorAgent(self.subdir,
                                                     known_file_paths,
                                                     self.num_runs,
                                                     type(self)._create_dag_file_processor,
                                                     processor_timeout,
                                                     self.dag_ids,
                                                     pickle_dags,
                                                     async_mode)

        try:
            self._execute_helper()
        except Exception:
            self.log.exception("Exception when executing execute_helper")
        finally:
            self.processor_agent.end()
            self.log.info("Exited execute loop")

_execute()是Schduler的主方法,执行调度系统的主逻辑,主要包含一下几部分:

2.3.1 list_py_file_paths(self.subdir): 找到指定路径下的dag文件

# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根据指定的self.subdir路径,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

list_py_file_paths(self.subdir)方法会遍历self.subdir文件夹,并在该文件夹下寻找dag文件,最终返回的结果如下所示:

后续的步骤需要对找到的dag文件进行解析。

2.3.2 创建DagFileProcessorAgent来解析找到的dag文件

# AIRFLOW SETTINGS:处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)

"""
新建一个file processor agent:
  dag_directory:默认的dags文件路径或用户指定的dags文件路径
  file_paths:dags文件夹下的py文件路径list
  max_runs:scheduler解析py文件的次数,默认为-1,表示一致解析
  processor_factory:用于创建DagFileProcessor进程(AbstractDagFileProcessor子类)的方法
  processor_timeout:DagFileProcessor进程超时时间
  async_mode:是否使用异步模式启动DagFileProcessorManager,如果db不是sqlite,则默认使用异步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
                                             known_file_paths,
                                             self.num_runs,
                                             type(self)._create_dag_file_processor,
                                             processor_timeout,
                                             self.dag_ids,
                                             pickle_dags,
                                             async_mode)

如上涉及到一个airflow的配置参数,表示处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程,airflow.cfg配置信息如下:

# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50
  • DagFileProcessorAgent

处理DAG文件的代理程序,它负责在整个调度过程中所有与DAG解析相关的工作。 DagFileProcessorAgent会创建Scheluer的子进程DagFileProcessorManager,而DagFileProcessorManager会为每一个dag文件创建一个DagFileProcessor进程,来处理dag文件并收集DAG文件的解析结果,并在解析dag文件的过程中,进行进程间通信,向scheluer主进程汇报文件处理结果。

如下图所示的是DagFileProcessorAgent,DagFileProcessorManager,DagFileProcessor以及对应的dag文件之间的对应:

2.3.3 SchdulerJob._execute_helper(): Schduler程序循环的主逻辑

如下部分为整个Scheduler程序的核心部分,其代码如下所示:

_execute_helper()

def _execute_helper(self):
    """
    The actual scheduler loop. The main steps in the loop are:
        #. Harvest DAG parsing results through DagFileProcessorAgent
        #. Find and queue executable tasks
            #. Change task instance state in DB
            #. Queue tasks in executor
        #. Heartbeat executor
            #. Execute queued tasks in executor asynchronously
            #. Sync on the states of running tasks
    Following is a graphic representation of these steps.
    .. image:: ../docs/img/scheduler_loop.jpg
    :rtype: None
    """
    # 根据选择的executor,执行其start方法
    self.executor.start()
    self.log.info("Resetting orphaned tasks for active dag runs")
    # 在启动scheduler程序的时候,将前缀为非backfill的Running的DagRun下状态为SCHEDULED,QUEUED的ti的状态重置为None,使其后续可以被调度执行
    self.reset_state_for_orphaned_tasks()
    
    # Start after resetting orphaned tasks to avoid stressing out DB.
    # 执行processor_agent的start方法,启动代理的DagFileProcessorManager,开始循环解析dags文件
    self.processor_agent.start()
    
    execute_start_time = timezone.utcnow()
    # Last time that self.heartbeat() was called.
    last_self_heartbeat_time = timezone.utcnow()
    
    # For the execute duration, parse and schedule DAGs
    # 开始循环接收DagFileProcessorManager的解析结果,并调度对应的dag和tis,while循环终止条件:
    # 1.设置了run_duration,并且while循环执行时间到达run_duration(默认-1);
    # 2.设置了num_runs,并且所有的dag文件都被处理了num_runs次。
    while (timezone.utcnow() - execute_start_time).total_seconds() < \
            self.run_duration or self.run_duration < 0:
        self.log.debug("Starting Loop...")
        loop_start_time = time.time()
        if self.using_sqlite:
            self.processor_agent.heartbeat()
            # For the sqlite case w/ 1 thread, wait until the processor
            # is finished to avoid concurrent access to the DB.
            self.log.debug(
                "Waiting for processors to finish since we're using sqlite")
            self.processor_agent.wait_until_finished()
            
        # 开始收集被解析的dag文件信息,调用的是self.processor_agent.harvest_simple_dags()方法
        self.log.debug("Harvesting DAG parsing results")
        simple_dags = self._get_simple_dags()
        self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
        # Send tasks for execution if available
        simple_dag_bag = SimpleDagBag(simple_dags)
        
        # 开始调度被解析到的tis
        # 1.处理文件解析的结果,并将其入executor的queued_tasks,并修改tis状态:SCHEDULED->QUEUED;
        # 2.执行executor的heartbeat方法,异步执行queued_tasks中的tis,并同步tis的执行状态;
        # 3.由于限制条件(pool,slots,concurrency等),对于executor的queued_tasks中未被执行的tis,清空queued_tasks,并将tis状态修改为SCHEDULED
        # 4.根据executor中异步执行的ti的结果,进行相应的逻辑处理
        if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
            continue
            
        # Heartbeat the scheduler periodically
        # 当前时间和上一次scheduler心跳间隔
        time_since_last_heartbeat = (timezone.utcnow() -
                                     last_self_heartbeat_time).total_seconds()
        # self.heartrate为配置文件中指定的scheduler心跳频率
        if time_since_last_heartbeat > self.heartrate:
            self.log.debug("Heartbeating the scheduler")
            # 执行heartbeat()方法
            # 1.使用当前时间更新job表中SchedulerJob心跳时间;
            # 2.如果job的状态被修改为SHUTDOWN,则kill当前job。
            self.heartbeat()
            # 设置当前心跳执行的时间
            last_self_heartbeat_time = timezone.utcnow()
            
        is_unit_test = conf.getboolean('core', 'unit_test_mode')
        loop_end_time = time.time()
        # while循环用时
        loop_duration = loop_end_time - loop_start_time
        self.log.debug(
            "Ran scheduling loop in %.2f seconds",
            loop_duration)
        
        if not is_unit_test:
            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
            # 如果设置了_processor_poll_interval则让while程序sleep指定时间
            time.sleep(self._processor_poll_interval)
            
        if self.processor_agent.done:
            self.log.info("Exiting scheduler loop as all files"
                          " have been processed {} times".format(self.num_runs))
            # 所有文件配处理了num_runs次,终止while循环
            break
            
        if loop_duration < 1 and not is_unit_test:
            sleep_length = 1 - loop_duration
            self.log.debug(
                "Sleeping for {0:.2f} seconds to prevent excessive logging"
                .format(sleep_length))
            # 如果while循环间隔小于1秒,则让while循环sleep(1 - loop_duration)秒,即while循环最小间隔为1秒
            sleep(sleep_length)
            
    # Stop any processors
    # 循环执行完了,向DagFileProcessorManager发送终止信号,让其停止所有的DagFileProcessor进程。
    self.processor_agent.terminate()
    
    # Verify that all files were processed, and if so, deactivate DAGs that
    # haven't been touched by the scheduler as they likely have been
    # deleted.
    if self.processor_agent.all_files_processed:
        self.log.info(
            "Deactivating DAGs that haven't been touched since %s",
            execute_start_time.isoformat()
        )
        # 将dag表中未被处理的dag记录的is_active设置为False
        models.DAG.deactivate_stale_dags(execute_start_time)
    
    # 执行executor的end方法,结束executor
    self.executor.end()
    settings.Session.remove()
2.3.3.1 self.executor.start(): 启动任务执行器

BaseJob的构造器中指定的executor:

self.executor = executor or executors.get_default_executor()

根据配置文件获得executor:

def get_default_executor():
    """Creates a new instance of the configured executor if none exists and returns it"""
    global DEFAULT_EXECUTOR

    if DEFAULT_EXECUTOR is not None:
        return DEFAULT_EXECUTOR
		# 根据airflow.cfg中的配置获取执行executor
    executor_name = conf.get('core', 'EXECUTOR')

    DEFAULT_EXECUTOR = _get_executor(executor_name)

    log.info("Using executor %s", executor_name)

    return DEFAULT_EXECUTOR

airflow.cfg配置信息,生产环境一般会设置成CeleryExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor

执行CeleryExecutor的start方法,仅输出一行日志显示当前executor使用多少进程来同步任务元数据:

def start(self):
    self.log.debug(
        'Starting Celery Executor using %s processes for syncing',
        self._sync_parallelism
    )
2.3.3.1 self.reset_state_for_orphaned_tasks(): scheduler启动之后重置给定状态的tis

在启动scheduler程序的时候,将非backfill前缀的而状态为RUNNING的的DagRun下状态为SCHEDULED,QUEUED的tis的状态重置为None,这样可以让scheduler程序后续将这些tis正常调度。

2.3.3.2 self.processor_agent.start(): 启动 DagFileProcessorManager 开始循环解析dag文件
# Start after resetting orphaned tasks to avoid stressing out DB.
# 执行processor_agent的start方法,启动DagFileProcessorManager处理器,开始循环解析dags文件
self.processor_agent.start()

DagFileProcessorAgent.start()方法:

def start(self):
    """
    Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
    """
    if six.PY2:
        context = multiprocessing
    else:
        mp_start_method = self._get_multiprocessing_start_method()
        context = multiprocessing.get_context(mp_start_method)
        
    # Scheduler和DagFileProcessorManager的进程双向通讯管道
    self._parent_signal_conn, child_signal_conn = context.Pipe()
    
    # 创建一个进程,其target为要执行的内容,args为传入target的参数
    self._process = context.Process(
        target=type(self)._run_processor_manager,
        args=(
            self._dag_directory,
            self._file_paths,
            self._max_runs,
            # getattr prevents error while pickling an instance method.
            # 获得一个进程工厂,主要是用来创建DagFileProcessor
            getattr(self, "_processor_factory"),
            self._processor_timeout,
            # Schduler进程(DagFileProcessorAgent)与子进程DagFileProcessorManager通信的管道
            child_signal_conn,
            self._dag_ids,
            self._pickle_dags,
            self._async_mode,
        )
    )
    
    # 执行process的start方法之后,会调用target的run方法
    self._process.start()
    self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid)

DagFileProcessorAgent和DagFileProcessorManager 的交互逻辑如下所示:

2.3.3.3 核心代码Scheduler程序的while循坏

_execute_helper方法的while循环是整个调度的核心,对于解析到的dag信息作出调度处理如下图所示:

循环的主要步骤如下:

  • 通过DagFileProcessorAgent获取DAG文件解析结果

  • 查找并排队可执行的任务

    • 改变DB中的tis状态;
    • 在执行器中对任务进行排队
  • 心跳执行器

    • 在执行器中异步执行排队的任务(调用executor的trigger_tasks方法)
    • 同步运行任务的状态(调用sync方法同步任务执行状态)
2.3.3.3.1 self._get_simple_dags(): 收集dag文件的解析结果
# 开始收集dag文件信息,调用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))

self._get_simple_dags()方法最终会调用self.processor_agent.harvest_simple_dags()方法:

  • DagFileProcessorAgent会保存dag文件的统计信息,其他代码会根据agent中保存的最新信息做相应处理;
  • 解析到的simple_dags信息(这里返回的都是可以被执行的dag,而其在db中对应的tis会被设置为SCHEDULED状态),会在后续交给Scheduler进行任务调度处理。
2.3.3.3.2 SimpleDagBag(simple_dags): 将收集到的所有的simple_dags包装成SimpleDagBag
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)

SimpleDagBag

class SimpleDagBag(BaseDagBag):
    """
    A collection of SimpleDag objects with some convenience methods.
    """

    def __init__(self, simple_dags):
        """
        Constructor.

        :param simple_dags: SimpleDag objects that should be in this
        :type list(airflow.utils.dag_processing.SimpleDagBag)
        """
        self.simple_dags = simple_dags
        self.dag_id_to_simple_dag = {}

        for simple_dag in simple_dags:
            self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag

    @property
    def dag_ids(self):
        """
        :return: IDs of all the DAGs in this
        :rtype: list[unicode]
        """
        return self.dag_id_to_simple_dag.keys()

    def get_dag(self, dag_id):
        """
        :param dag_id: DAG ID
        :type dag_id: unicode
        :return: if the given DAG ID exists in the bag, return the BaseDag
        corresponding to that ID. Otherwise, throw an Exception
        :rtype: airflow.utils.dag_processing.SimpleDag
        """
        if dag_id not in self.dag_id_to_simple_dag:
            raise AirflowException("Unknown DAG ID {}".format(dag_id))
        return self.dag_id_to_simple_dag[dag_id]
2.3.3.3.3 self._validate_and_run_task_instances: 验证并执行tis

_Scheduler.validate_and_run_task_instances()

def _validate_and_run_task_instances(self, simple_dag_bag):
    if len(simple_dag_bag.simple_dags) > 0:
        try:
            # 1.处理文件解析的结果,并将其入executor的queued_tasks,并修改tis状态:SCHEDULED->QUEUED;
            self._process_and_execute_tasks(simple_dag_bag)
        except Exception as e:
            self.log.error("Error queuing tasks")
            self.log.exception(e)
            return False

    # Call heartbeats
    self.log.debug("Heartbeating the executor")
    # 2.调用executor的heartbeat发送心跳
    # 1) executor.trigger_tasks(open_slots)异步执行queued_tasks中的tis;
    # 2) executor.sync()同步tis的元数据
    self.executor.heartbeat()

    # 3.对于executor的queued_tasks中未被执行的ti,清空queued_tasks,并将其状态修改为SCHEDULED
    self._change_state_for_tasks_failed_to_execute()

    # Process events from the executor
    # 4.根据executor中异步执行的tis的结果,进行相应的逻辑处理
    self._process_executor_events(simple_dag_bag)
    return True
  • self._process_and_execute_tasks(simple_dag_bag)

    • 处理那些dag_run中状态为非running的的task_instance,如果task_instance的状态为up_for_retry,但是其dag_run不是running状态,那么将task_instance的状态设置为failed,后续不再调度它们

    • 如果task_instance的状态为scheduled/queued/up for reschedule,但是其dag_run的状态不是running那么将其状态设置为None,后续不再调度它们

    • 准备执行那些满足条件的task_instance:

      • 按照条件来查找出可被执行的tis:满足条件(priority/concurrency/max_active_runs/pool limits)状态为SCHEDULED的task_instance;

        • 对应dag的不为paused,dag_run不是backfill,tis为SCHEDULED状态;
      • 在db中修改上述可被执行的task_instance的状态为QUEUED;

      • 在executor中对上述的task_instance生成airflow run XXX命令,并将这些命令执行入队操作(放入executor的queued_tasks的字典中);

        queued_tasks字典中的元素如下:

        key:self.dag_id, self.task_id, self.execution_date, self.try_number

        value: (command, priority, queue, simple_task_instance)

  • self.executor.heartbeat()

    • 根据可用的slots数,将执行任务的airflow run xxx命令通过celery发送给远端的worker来执行;

    • 获取远端worker执行任务的执行状态和并在Schduler节点的executor中的保存任务状态信息,其结果会在保存在event_buffer字典中:

      key:self.dag_id, self.task_id, self.execution_date, self.try_number

      value:State.FAILED or State.SUCCESS

      event_buffer字典中任务状态信息会在self._process_executor_events方法中使用。

  • self._change_state_for_tasks_failed_to_execute()

    • 对于executor的queued_tasks中未被执行的tis,在db中找到对应的QUEUED状态的tis,将其状态修改为SCHEDULED,并清空queued_tasks字典;
  • self._process_executor_events(simple_dag_bag)

    • 如果executor已执行的task收到的状态回复为FAILED或SUCCESS,但是db中ti状态为QUEUED,则任务可能为killed externally,将其db中的tis的状态修改为FAILED。

      我们在使用external_task_sensor的时候,当external_task_sensor设置的reschedule_date时间非常短,可能会造成上面的问题。

      主要原因是第一次执行external_task_sensor的时候,不满足条件将其设置为reschedule,而Scheduler又非常快速的将该dag文件解析入队执行,导致当前的ti的状态被修改为QUEUED状态,但是这是由于才开始执行到self._process_executor_events,会出现SUCCESS != QUEUED的情况,导致最终db中的ti状态变成FAILED。

      参考:

      1. AIRFLOW-5071
      2. Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?
2.3.3.3.4 周期的执行SchedulerJob的心跳方法
# Heartbeat the scheduler periodically
# 当前时间和上一次scheduler心跳间隔
time_since_last_heartbeat = (timezone.utcnow() -
                             last_self_heartbeat_time).total_seconds()
# self.heartrate为配置文件中scheduler_heartbeat_sec指定的scheduler心跳频率
if time_since_last_heartbeat > self.heartrate:
    self.log.debug("Heartbeating the scheduler")
    # 执行heartbeat()方法
    # 1.如果job的状态被修改为SHUTDOWN,则kill当前job;
    # 2.如果没有达到指定job的心跳频率(job_heartbeat_sec),则sleep;
    # 3.使用当前时间更新job表中心跳时间。  
    self.heartbeat()
    # 设置当前心跳执行的时间
    last_self_heartbeat_time = timezone.utcnow()

airflow.cfg中的scheduler_heartbeat_sec配置项:

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

airflow.cfg中的job_heartbeat_sec配置项:

# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

2.3.3.3.5 self._processor_poll_interval: 轮训间隔时间
if not is_unit_test:
    self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
    # 如果设置了_processor_poll_interval则让while程序sleep指定时间
    time.sleep(self._processor_poll_interval)

airflow.cfg中的processor_poll_interval配置项:

# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 1
2.3.3.3.6 self.num_runs: while循环终止条件dag文件达到指定的处理次数
if self.processor_agent.done:
    self.log.info("Exiting scheduler loop as all files"
                  " have been processed {} times".format(self.num_runs))
    # 所有文件配处理了num_runs次,终止while循环
    break

airflow.cfg中的num_runs配置项:

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
2.3.3.3.7 设置while循环的最小轮训时间
if loop_duration < 1 and not is_unit_test:
    sleep_length = 1 - loop_duration
    self.log.debug(
        "Sleeping for {0:.2f} seconds to prevent excessive logging"
        .format(sleep_length))
    # 如果while循环间隔小于1秒,则让while循环sleep(1 - loop_duration)秒,即while循环最小间隔为1秒
    sleep(sleep_length)
2.3.3.4 self.processor_agent.terminate(): 向DagFileProcessorManager发送终止信号
# Stop any processors
# 向DagFileProcessorManager发送终止信号,让其停止所有的DagFileProcessor进程。
self.processor_agent.terminate()

DagFileProcessorAgent.terminate()方法:

def terminate(self):
    """
    Send termination signal to DAG parsing processor manager
    and expect it to terminate all DAG file processors.
    """
    if self._process and self._process.is_alive():
        self.log.info("Sending termination message to manager.")
        try:
          	# 通过Schduler进程的通信管道,向DagFileProcessorManager发送TERMINATE_MANAGER的信号
            self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
        except ConnectionError:
            pass
2.3.3.5 models.DAG.deactivate_stale_dags(execute_start_time):
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
    self.log.info(
        "Deactivating DAGs that haven't been touched since %s",
        execute_start_time.isoformat()
    )
    # 将dag表中未被处理的dag记录的is_active设置为False
    models.DAG.deactivate_stale_dags(execute_start_time)
2.3.3.6 self.executor.end(): 结束executor
# 执行executor的end方法,结束executor
self.executor.end()

CeleryExecutor.end()方法:

def end(self, synchronous=False):
    if synchronous:
        while any([
                task.state not in celery_states.READY_STATES
                for task in self.tasks.values()]):
            time.sleep(5)
    self.sync()

2.3.4 self.processor_agent.end():结束DagFileProcessorManager

结束Scheduler进程的子进程DagFileProcessorManager:

def end(self):
    """
    Terminate (and then kill) the manager process launched.
    :return:
    """
    if not self._process:
        self.log.warning('Ending without manager process.')
        return
    reap_process_group(self._process.pid, log=self.log)
    self._parent_signal_conn.close()

本文标签: 源码airflowscheduler