admin管理员组

文章数量:1647998

1.概述

转载:Flink 1.12.2 源码浅析 : TaskExecutor

TaskExecutor 是TaskManger的具体实现.

二 .TaskExecutorGateway

TaskExecutor 是TaskManager的具体实现, 首先看网关都实现了什么逻辑. 清单如下

2.1. 类图

2.2. 接口清单

名称描述
CompletableFuture requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId,
@RpcTimeout Time timeout)
从TaskManager请求slot
requestTaskBackPressure获取任务背压相关信息
submitTask( TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
@RpcTimeout Time timeout)
[核心]提交任务
updatePartitions更改任务的 分区
releaseOrPromotePartitions批量发布/升级中间结果分区。
releaseClusterPartitions释放属于任何给定数据集的所有群集分区
triggerCheckpoint**触发给定任务的checkpoint。**checkpoint由checkpointID和checkpoint时间戳标识。
confirmCheckpoint确认给定任务的checkpoint。
checkpoint由checkpointID和checkpoint时间戳标识。
abortCheckpoint终止Checkpoint
cancelTask取消任务
heartbeatFromJobManagerJobManager心跳请求
heartbeatFromResourceManagerResourceManager心跳请求
disconnectJobManager断开给定JobManager与TaskManager的连接。
disconnectResourceManager建立给定ResourceManager与TaskManager的连接。
freeSlot释放slot
requestFileUploadByType请求将指定类型的文件上载到集群的{@link BlobServer}。
requestFileUploadByName请求将指定名称的文件上载到集群的{@link BlobServer}。
requestMetricQueryServiceAddress返回TaskManager上度量查询服务的网关。
canBeReleased检查是否可以释放任务执行器。如果有未使用的结果分区,则不能释放它。
requestLogList请求TaskManager上的历史日志文件名。
sendOperatorEventToTask向Task发送Operator Event
requestThreadDump请求TaskManager 的thread dump 信息

三 .代码浅析

3.1. 属性

3.1.1. 服务相关

   // HA
    /** The access to the leader election and retrieval services. */
    private final HighAvailabilityServices haServices;

    // TaskExecutor 相关的服务比如: MemoryManager , IOManager ,ShuffleEnvironment 等等
    private final TaskManagerServices taskExecutorServices;

    /**
     * The task manager configuration.
     * */
    private final TaskManagerConfiguration taskManagerConfiguration;

    /** The fatal error handler to use in case of a fatal error. */
    private final FatalErrorHandler fatalErrorHandler;

    // BLOB缓存提供对永久和临时BLOB的BLOB服务的访问。
    private final BlobCacheService blobCacheService;

    private final LibraryCacheManager libraryCacheManager;

    /** The address to metric query service on this Task Manager. */
    @Nullable private final String metricQueryServiceAddress;


3.1.2. TaskManager相关服务


    /**
     * 此任务管理器的连接信息。
     * The connection information of this task manager. */
    private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;

    private final TaskManagerMetricGroup taskManagerMetricGroup;

    /**
     * 此任务的状态管理器,为每个插槽提供状态管理器。
     * The state manager for this task, providing state managers per slot. */
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;

    /** Information provider for external resources. */
    private final ExternalResourceInfoProvider externalResourceInfoProvider;

    /** The network component in the task manager. */
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;

    /** The kvState registration service in the task manager. */
    private final KvStateService kvStateService;

    private final Executor ioExecutor;


3.1.3. 任务slot分配表

    private final TaskSlotTable<Task> taskSlotTable;

    private final JobTable jobTable;

    private final JobLeaderService jobLeaderService;

    private final LeaderRetrievalService resourceManagerLeaderRetriever;


3.1.4. resource manager 相关

 // resource manager 相关
    @Nullable private ResourceManagerAddress resourceManagerAddress;

    @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    @Nullable private TaskExecutorToResourceManagerConnection resourceManagerConnection;

    @Nullable private UUID currentRegistrationTimeoutId;

    private Map<JobID, Collection<CompletableFuture<ExecutionState>>>
            taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8);


3.1.5. 其他


    // 硬件描述信息
    private final HardwareDescription hardwareDescription;

    // 内存配置信息
    private final TaskExecutorMemoryConfiguration memoryConfiguration;

    // 文件缓存
    private FileCache fileCache;

    // jobManager 心跳相关
    /** The heartbeat manager for job manager in the task manager. */
    private final HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload>
            jobManagerHeartbeatManager;

    // resource manager 心跳相关
    /** The heartbeat manager for resource manager in the task manager. */
    private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload>
            resourceManagerHeartbeatManager;

    // 分区相关
    private final TaskExecutorPartitionTracker partitionTracker;

    // 背压相关
    private final BackPressureSampleService backPressureSampleService;


3.2. 核心方法

3.2.1. requestSlot

ResourceManager中的SlotManager的调用requestSlot接口向TaskExecutor请求slot .

org.apache.flink.runtime.taskexecutor.TaskExecutor#requestSlot

 @Override
    public CompletableFuture<Acknowledge> requestSlot(
            final SlotID slotId,
            final JobID jobId,
            final AllocationID allocationId,
            final ResourceProfile resourceProfile,
            final String targetAddress,
            final ResourceManagerId resourceManagerId,
            final Time timeout) {
        // TODO: Filter invalid requests from the resource manager by using the
        // instance/registration Id



        // 输出日志信息
        // Receive slot request
        //      3755cb8f9962a9a7738db04f2a02084c
        // for job
        //      694474d11da6100e82744c9e47e2f511
        // from resource manager with leader id
        //      00000000000000000000000000000000.

        log.info(
                "Receive slot request {} for job {} from resource manager with leader id {}.",
                allocationId,
                jobId,
                resourceManagerId);


        // 是否连接到 ResourceManager
        if (!isConnectedToResourceManager(resourceManagerId)) {
            final String message =
                    String.format(
                            "TaskManager is not connected to the resource manager %s.",
                            resourceManagerId);
            log.debug(message);
            return FutureUtils.completedExceptionally(new TaskManagerException(message));
        }

        try {

            //[重点] 分配 slot
            allocateSlot(slotId, jobId, allocationId, resourceProfile);
        } catch (SlotAllocationException sae) {
            return FutureUtils.completedExceptionally(sae);
        }

        final JobTable.Job job;

        try {

            // 获取/构建  JobTable.Job


            job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));



        } catch (Exception e) {
            // free the allocated slot
            try {
                taskSlotTable.freeSlot(allocationId);
            } catch (SlotNotFoundException slotNotFoundException) {
                // slot no longer existent, this should actually never happen, because we've
                // just allocated the slot. So let's fail hard in this case!
                onFatalError(slotNotFoundException);
            }

            // release local state under the allocation id.
            localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

            // sanity check
            if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                onFatalError(new Exception("Could not free slot " + slotId));
            }

            return FutureUtils.completedExceptionally(
                    new SlotAllocationException("Could not create new job.", e));
        }

        if (job.isConnected()) {

            //[重要]  向JobManager提供Slot
            offerSlotsToJobManager(jobId);
        }

        return CompletableFuture.completedFuture(Acknowledge.get());
    }

org.apache.flink.runtime.taskexecutor.TaskExecutor#allocateSlot(slotId, jobId, allocationId, resourceProfile);


    private void allocateSlot(
            SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile)
            throws SlotAllocationException {

        //    slotId = {SlotID@6055} "container_1619273419318_0032_01_000002_0"
        //        resourceId = {ResourceID@6114} "container_1619273419318_0032_01_000002"
        //        slotNumber = 0
        //    jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"
        //    allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"
        //    resourceProfile = {ResourceProfile@6058} "ResourceProfile{UNKNOWN}"
        //        cpuCores = null
        //        taskHeapMemory = null
        //        taskOffHeapMemory = null
        //        managedMemory = null
        //        networkMemory = null
        //    extendedResources = {HashMap@6116}  size = 0



        //    taskSlotTable = {TaskSlotTableImpl@6077}
        //    numberSlots = 4
        //        defaultSlotResourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
        //        cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"
        //        taskHeapMemory = {MemorySize@6140} "100663293 bytes"
        //        taskOffHeapMemory = {MemorySize@6141} "0 bytes"
        //        managedMemory = {MemorySize@6142} "134217730 bytes"
        //        networkMemory = {MemorySize@6143} "32 mb"
        //        extendedResources = {HashMap@6144}  size = 0
        //    memoryPageSize = 32768
        //    timerService = {TimerService@6125}
        //    taskSlots = {HashMap@6126}  size = 0
        //    allocatedSlots = {HashMap@6127}  size = 0
        //    taskSlotMappings = {HashMap@6128}  size = 0
        //    slotsPerJob = {HashMap@6129}  size = 0
        //    slotActions = {TaskExecutor$SlotActionsImpl@6130}
        //    state = {TaskSlotTableImpl$State@6131} "RUNNING"
        //    budgetManager = {ResourceBudgetManager@6132}
        //    closingFuture = {CompletableFuture@6133} "java.util.concurrent.CompletableFuture@9a6e076[Not completed]"
        //    mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@6096}
        //    memoryVerificationExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"
        //    if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {


        if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {


            // 进行分配操作..
            // TaskSlotTableImpl # allocateSlot
            if (taskSlotTable.allocateSlot(
                    slotId.getSlotNumber(),
                    jobId,
                    allocationId,
                    resourceProfile,
                    taskManagerConfiguration.getTimeout())) {


                // Allocated slot for 3755cb8f9962a9a7738db04f2a02084c.
                log.info("Allocated slot for {}.", allocationId);
            } else {
                log.info("Could not allocate slot for {}.", allocationId);
                throw new SlotAllocationException("Could not allocate slot.");
            }
        } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
            final String message =
                    "The slot " + slotId + " has already been allocated for a different job.";

            log.info(message);

            final AllocationID allocationID =
                    taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
            throw new SlotOccupiedException(
                    message, allocationID, taskSlotTable.getOwningJob(allocationID));
        }
    }

org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager(jobId);


    // ------------------------------------------------------------------------
    //  Internal job manager connection methods
    // ------------------------------------------------------------------------

    private void offerSlotsToJobManager(final JobID jobId) {
        // 向JobManager提供Slot : internalOfferSlotsToJobManager
        jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
    }


    private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
        // 获取JobID
        final JobID jobId = jobManagerConnection.getJobId();

        // JobID是否已经分配
        if (taskSlotTable.hasAllocatedSlots(jobId)) {

            // Offer reserved slots to the leader of job 694474d11da6100e82744c9e47e2f511.
            log.info("Offer reserved slots to the leader of job {}.", jobId);

            // 获取JobMaster 的  Gateway
            final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();

            // 获取 分配给jobId 的所有 TaskSlot
            final Iterator<TaskSlot<Task>> reservedSlotsIterator =  taskSlotTable.getAllocatedSlots(jobId);

            // 获取 JobMasterId
            final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

            // 保留的Slot
            final Collection<SlotOffer> reservedSlots = new HashSet<>(2);

            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
                reservedSlots.add(offer);
            }

            // offerSlots
            // Offers the given slots to the job manager.
            // The response contains the set of accepted slots.

            // JobMaster#offerSlots
            CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
                    jobMasterGateway.offerSlots(
                            getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());


            // 异步操作.  处理响应请求,处理异常 || 标记为 slot 状态为active
            acceptedSlotsFuture.whenCompleteAsync(
                    handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
                    getMainThreadExecutor());
        } else {
            log.debug("There are no unassigned slots for the job {}.", jobId);
        }
    }

3.2.2. freeSlot

Frees the slot with the given allocation ID.

    @Override
    public CompletableFuture<Acknowledge> freeSlot(
            AllocationID allocationId, Throwable cause, Time timeout) {
        freeSlotInternal(allocationId, cause);

        return CompletableFuture.completedFuture(Acknowledge.get());
    }
	
	
    private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
        checkNotNull(allocationId);

        log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());

        try {
            final JobID jobId = taskSlotTable.getOwningJob(allocationId);

            // 获取slot 索引的下标.
            final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);

            if (slotIndex != -1) {

                if (isConnectedToResourceManager()) {
                    // 获取ResourceManager
                    // the slot was freed. Tell the RM about it
                    ResourceManagerGateway resourceManagerGateway =
                            establishedResourceManagerConnection.getResourceManagerGateway();
                    // 通知RM slot释放.
                    resourceManagerGateway.notifySlotAvailable(
                            establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
                            new SlotID(getResourceID(), slotIndex),
                            allocationId);
                }

                if (jobId != null) {
                    closeJobManagerConnectionIfNoAllocatedResources(jobId);
                }
            }
        } catch (SlotNotFoundException e) {
            log.debug("Could not free slot for allocation id {}.", allocationId, e);
        }

        // 本地存储清空
        localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
    }


3.2.3. submitTask

核心的就是构造一个Task, 然后交由线程Thread执行.


    // ----------------------------------------------------------------------
    // Task lifecycle RPCs
    // 提交 任务 !!!
    // ----------------------------------------------------------------------

    @Override
    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {

        try {
            final JobID jobId = tdd.getJobId();
            final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();

            final JobTable.Connection jobManagerConnection =
                    jobTable.getConnection(jobId)
                            .orElseThrow(
                                    () -> {
                                        final String message =
                                                "Could not submit task because there is no JobManager "
                                                        + "associated for the job "
                                                        + jobId
                                                        + '.';

                                        log.debug(message);
                                        return new TaskSubmissionException(message);
                                    });

            if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                final String message =
                        "Rejecting the task submission because the job manager leader id "
                                + jobMasterId
                                + " does not match the expected job manager leader id "
                                + jobManagerConnection.getJobMasterId()
                                + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
                final String message =
                        "No task slot allocated for job ID "
                                + jobId
                                + " and allocation ID "
                                + tdd.getAllocationId()
                                + '.';
                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            // re-integrate offloaded data:
            try {
                tdd.loadBigData(blobCacheService.getPermanentBlobService());
            } catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException(
                        "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
            }

            // deserialize the pre-serialized information
            final JobInformation jobInformation;
            final TaskInformation taskInformation;
            try {
                jobInformation =
                        tdd.getSerializedJobInformation()
                                .deserializeValue(getClass().getClassLoader());
                taskInformation =
                        tdd.getSerializedTaskInformation()
                                .deserializeValue(getClass().getClassLoader());
            } catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException(
                        "Could not deserialize the job or task information.", e);
            }

            if (!jobId.equals(jobInformation.getJobId())) {
                throw new TaskSubmissionException(
                        "Inconsistent job ID information inside TaskDeploymentDescriptor ("
                                + tdd.getJobId()
                                + " vs. "
                                + jobInformation.getJobId()
                                + ")");
            }

            TaskMetricGroup taskMetricGroup =
                    taskManagerMetricGroup.addTaskForJob(
                            jobInformation.getJobId(),
                            jobInformation.getJobName(),
                            taskInformation.getJobVertexId(),
                            tdd.getExecutionAttemptId(),
                            taskInformation.getTaskName(),
                            tdd.getSubtaskIndex(),
                            tdd.getAttemptNumber());

            InputSplitProvider inputSplitProvider =
                    new RpcInputSplitProvider(
                            jobManagerConnection.getJobManagerGateway(),
                            taskInformation.getJobVertexId(),
                            tdd.getExecutionAttemptId(),
                            taskManagerConfiguration.getTimeout());

            final TaskOperatorEventGateway taskOperatorEventGateway =
                    new RpcTaskOperatorEventGateway(
                            jobManagerConnection.getJobManagerGateway(),
                            executionAttemptID,
                            (t) -> runAsync(() -> failTask(executionAttemptID, t)));

            TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
            CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
            GlobalAggregateManager aggregateManager =
                    jobManagerConnection.getGlobalAggregateManager();

            LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
                    jobManagerConnection.getClassLoaderHandle();
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier =
                    jobManagerConnection.getResultPartitionConsumableNotifier();
            PartitionProducerStateChecker partitionStateChecker =
                    jobManagerConnection.getPartitionStateChecker();

            final TaskLocalStateStore localStateStore =
                    localStateStoresManager.localStateStoreForSubtask(
                            jobId,
                            tdd.getAllocationId(),
                            taskInformation.getJobVertexId(),
                            tdd.getSubtaskIndex());

            final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

            // 构造 TaskStateManager
            final TaskStateManager taskStateManager =
                    new TaskStateManagerImpl(
                            jobId,
                            tdd.getExecutionAttemptId(),
                            localStateStore,
                            taskRestore,
                            checkpointResponder);

            MemoryManager memoryManager;
            try {
                memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
            } catch (SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }

            // 构造一个新的Task
            Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getSubtaskIndex(),
                            tdd.getAttemptNumber(),
                            tdd.getProducedPartitions(),
                            tdd.getInputGates(),
                            tdd.getTargetSlotNumber(),
                            memoryManager,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            resultPartitionConsumableNotifier,
                            partitionStateChecker,
                            getRpcService().getExecutor());

            taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);

            // Received task
            //      Window(
            //          TumblingProcessingTimeWindows(5000),
            //          ProcessingTimeTrigger,
            //          ReduceFunction$1, PassThroughWindowFunction
            //      ) ->
            //      Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b),
            //
            // deploy into slot with allocation id
            //      3755cb8f9962a9a7738db04f2a02084c.

            log.info(
                    "Received task {} ({}), deploy into slot with allocation id {}.",
                    task.getTaskInfo().getTaskNameWithSubtasks(),
                    tdd.getExecutionAttemptId(),
                    tdd.getAllocationId());

            boolean taskAdded;

            try {
                taskAdded = taskSlotTable.addTask(task);
            } catch (SlotNotFoundException | SlotNotActiveException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }

            if (taskAdded) {
                // 启动线程
                task.startTaskThread();

                setupResultPartitionBookkeeping(
                        tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
                return CompletableFuture.completedFuture(Acknowledge.get());
            } else {
                final String message =
                        "TaskManager already contains a task for id " + task.getExecutionId() + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }
        } catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

3.2.4. updatePartitions

根据分区信息 org.apache.flink.runtime.iowork.NettyShuffleEnvironment#updatePartitionInfo 同步分区相关信息.


    @Override
    public CompletableFuture<Acknowledge> updatePartitions(
            final ExecutionAttemptID executionAttemptID,
            Iterable<PartitionInfo> partitionInfos,
            Time timeout) {
        // 获取任务
        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {

            // 迭代分区信息
            for (final PartitionInfo partitionInfo : partitionInfos) {
                // Run asynchronously because it might be blocking
                FutureUtils.assertNoException(
                        CompletableFuture.runAsync(
                                () -> {
                                    try {
                                        // 更改分区信息
                                        if (!shuffleEnvironment.updatePartitionInfo( executionAttemptID, partitionInfo)) {
                                            log.debug(
                                                    "Discard update for input gate partition {} of result {} in task {}. "
                                                            + "The partition is no longer available.",
                                                    partitionInfo
                                                            .getShuffleDescriptor()
                                                            .getResultPartitionID(),
                                                    partitionInfo.getIntermediateDataSetID(),
                                                    executionAttemptID);
                                        }
                                    } catch (IOException | InterruptedException e) {
                                        log.error(
                                                "Could not update input data location for task {}. Trying to fail task.",
                                                task.getTaskInfo().getTaskName(),
                                                e);
                                        task.failExternally(e);
                                    }
                                },
                                getRpcService().getExecutor()));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            log.debug(
                    "Discard update for input partitions of task {}. Task is no longer running.",
                    executionAttemptID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }


3.2.5. triggerCheckpoint

最终是调用task的triggerCheckpointBarrier方法, 触发Checkpoint .

@Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions) {
        log.debug(
                "Trigger checkpoint {}@{} for {}.",
                checkpointId,
                checkpointTimestamp,
                executionAttemptID);

        // 获取CheckpointType
        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();


        if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
                && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException(
                    "Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        // 获取任务
        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            // 调用task的triggerCheckpointBarrier方法, 触发chckpoint
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message =
                    "TaskManager received a checkpoint request for unknown task "
                            + executionAttemptID
                            + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(
                    new CheckpointException(
                            message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
        }
    }

3.2.6. confirmCheckpoint

通过task的 notifyCheckpointComplete 方法 . Checkpoint完成


    @Override
    public CompletableFuture<Acknowledge> confirmCheckpoint(
            ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        log.debug(
                "Confirm checkpoint {}@{} for {}.",
                checkpointId,
                checkpointTimestamp,
                executionAttemptID);

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            // 通过task的 notifyCheckpointComplete 方法 . Checkpoint完成
            task.notifyCheckpointComplete(checkpointId);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message =
                    "TaskManager received a checkpoint confirmation for unknown task "
                            + executionAttemptID
                            + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(
                    new CheckpointException(
                            message,
                            CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
        }
    }

3.2.7. abortCheckpoint

通过task的 notifyCheckpointAborted 方法 . Checkpoint取消


    @Override
    public CompletableFuture<Acknowledge> abortCheckpoint(
            ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        log.debug(
                "Abort checkpoint {}@{} for {}.",
                checkpointId,
                checkpointTimestamp,
                executionAttemptID);

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            // abortCheckpoint
            task.notifyCheckpointAborted(checkpointId);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message =
                    "TaskManager received an aborted checkpoint for unknown task "
                            + executionAttemptID
                            + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(
                    new CheckpointException(
                            message,
                            CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
        }
    }

3.2.8. heartbeatFromJobManager

JobManager的心跳相关信息

    @Override
    public void heartbeatFromJobManager(
            ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
        jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
    }

3.2.9. heartbeatFromResourceManager

ResourceManager的心跳相关信息

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

3.2.10. requestFileUploadByType

根据类型请求文件的上传路径…

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUploadByType(
            FileType fileType, Time timeout) {
        final String filePath;
        switch (fileType) {
            case LOG:
                filePath = taskManagerConfiguration.getTaskManagerLogPath();
                break;
            case STDOUT:
                filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            default:
                filePath = null;
        }
        return requestFileUploadByFilePath(filePath, fileType.toString());
    }

3.2.11. requestFileUploadByName

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUploadByName(
            String fileName, Time timeout) {
        final String filePath;
        final String logDir = taskManagerConfiguration.getTaskManagerLogDir();

        if (StringUtils.isNullOrWhitespaceOnly(logDir)
                || StringUtils.isNullOrWhitespaceOnly(fileName)) {
            filePath = null;
        } else {
            //  根据 taskManagerLogDir + 文件的名字 获取路径
            filePath = new File(logDir, new File(fileName).getName()).getPath();
        }
        return requestFileUploadByFilePath(filePath, fileName);
    }


3.2.12. sendOperatorEventToTask


    @Override
    public CompletableFuture<Acknowledge> sendOperatorEventToTask(
            ExecutionAttemptID executionAttemptID,
            OperatorID operatorId,
            SerializedValue<OperatorEvent> evt) {

        log.debug("Operator event for {} - {}", executionAttemptID, operatorId);

        // 获取Task
        final Task task = taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            return FutureUtils.completedExceptionally(
                    new TaskNotRunningException(
                            "Task " + executionAttemptID + " not running on TaskManager"));
        }

        try {
            // 发送 OperatorEvent 给 task
            task.deliverOperatorEvent(operatorId, evt);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalError(t);
            return FutureUtils.completedExceptionally(t);
        }
    }

3.2.13. requestThreadDump

请求获取Thread线程的信息…


    @Override
    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
        final Collection<ThreadInfo> threadDump = JvmUtils.createThreadDump();

        final Collection<ThreadDumpInfo.ThreadInfo> threadInfos =
                threadDump.stream()
                        .map(
                                threadInfo ->
                                        ThreadDumpInfo.ThreadInfo.create(
                                                threadInfo.getThreadName(), threadInfo.toString()))
                        .collect(Collectors.toList());

        return CompletableFuture.completedFuture(ThreadDumpInfo.create(threadInfos));
    }
    

本文标签: 源码FlinkTaskExecutor