Long Runner Pipeline Life Cycle

This document explains the pipeline life cycle from the point of view of a developer writing a pipeline controller.

A pipeline is a task whose task type category is Pipeline. The pipeline controller receives pipeline events, creates normal tasks connected to the pipeline task, observes task completion, and finally finishes the pipeline task.

API Reference

This guide is about lifecycle behavior. The generated API reference for the main pipeline types is:

Main Concepts

Pipeline task:

Pipeline controller:

Connected pipeline task:

Normal Pipeline Flow

  1. A client creates a pipeline task and starts it.
  1. The task controller marks the pipeline task as running and sends PipelineStarted to the actor
  2. group for that pipeline task type.

  1. The pipeline controller handles PipelineStarted.

The event contains:

- taskId: the pipeline task id. - taskTypeHid: the pipeline task type HID. - ownerContext: owner of the pipeline task. - pipelineInput: input data from the pipeline task.

  1. The pipeline controller creates normal tasks with
  2. monitorApi.createTask(pipelineTaskId, request).

These tasks are connected to the pipeline task and can be started immediately through the TaskRequest.startImmediately flag.

  1. Normal tasks run through task executors.
  1. When a connected normal task finishes, the task controller sends PipelineTaskFinished to the
  2. pipeline controller group for the owning pipeline type.

The event contains:

- taskId: finished connected normal task id. - taskTypeHid: finished connected normal task type HID. - pipelineTaskId: owning pipeline task id. - pipelineOwnerContext: owner of the pipeline task. - taskResult: output, errors, progress, and finishedOk for the finished task.

  1. When the pipeline controller knows it will not create any more tasks for the pipeline, it calls
  2. monitorApi.pipelineIsDoneCreatingTasks(pipelineTaskId).

After this call, new connected tasks for this pipeline are rejected. This protects the pipeline from being finished while the controller might still create more work later.

  1. The task controller periodically scans pipelines that are done creating tasks. When all connected
  2. normal tasks for that pipeline have runningStatus == Finished, it sends PipelineReadyToFinish.

The event contains:

- taskId: pipeline task id. - taskTypeHid: pipeline task type HID. - ownerContext: owner of the pipeline task.

  1. The pipeline controller handles PipelineReadyToFinish, gathers any final results it needs, and
  2. calls monitorApi.finishPipeline(taskId, taskTypeHid, result).

  1. The task controller accepts finishPipeline only after the pipeline is ready to finish. It then
  2. stores the final pipeline result and marks the pipeline task finished.

Reading Connected Task Results

Pipeline controllers can read finished connected task results with:


monitorApi.getPipelineTasksResults(
    pipelineTaskId = pipelineTaskId,
    taskTypeHids = listOf("some-task-type"),
)

If taskTypeHids is null or empty, results for all finished connected tasks are returned. Only tasks whose runningStatus == Finished are included.

This API is useful in PipelineReadyToFinish handlers, where all normal connected tasks are already finished.

Multi-Stage Pipelines

A pipeline controller can mark one task type as done without closing the whole pipeline:


monitorApi.pipelineIsDoneCreatingTasks(
    pipelineTaskId = pipelineTaskId,
    taskTypeHid = "map-task",
)

This means:

When all connected tasks of that type are finished, the task controller sends PipelineTaskTypeFinished.

The event contains:

This is intended for map/reduce or multi-stage pipelines. For example:

  1. On PipelineStarted, create all map-task tasks.
  2. Call pipelineIsDoneCreatingTasks(pipelineTaskId, "map-task").
  3. On PipelineTaskTypeFinished("map-task"), read map results and create reduce-task tasks.
  4. Call pipelineIsDoneCreatingTasks(pipelineTaskId, "reduce-task").
  5. On PipelineTaskTypeFinished("reduce-task"), decide whether to create another stage or close
  6. the whole pipeline with pipelineIsDoneCreatingTasks(pipelineTaskId).

Type-specific completion does not finish the pipeline. The whole pipeline still finishes only after the controller calls pipelineIsDoneCreatingTasks(pipelineTaskId), receives or otherwise reaches ready-to-finish state, and calls finishPipeline.

Event Delivery

Important details:

Creation Boundaries

The task controller enforces two creation boundaries:

These boundaries are permanent for that pipeline execution. Call them only after the controller has created all tasks covered by the boundary.

Finishing Rules

The pipeline task is not finished by connected task completion alone.

The whole pipeline is finished only when:

  1. The controller has marked the whole pipeline done creating tasks.
  2. All connected normal tasks are finished.
  3. The controller calls finishPipeline.
  4. The task controller stores the final TaskResult and marks the pipeline task finished.

If finishPipeline is called too early, before the pipeline is ready to finish, the task controller ignores it.

Minimal Controller Shape


class ExamplePipelineController(
    override val monitorApi: PipelineControllerMonitorApi,
) : PipelineController {
    override fun onEvent(event: PipelineControllerEvent) {
        when (event) {
            is PipelineStarted -> {
                monitorApi.createTask(
                    pipelineTaskId = event.taskId,
                    request = TaskRequest(
                        taskTypeHid = "worker-task",
                        ownerContext = event.ownerContext,
                        startImmediately = true,
                        input = event.pipelineInput,
                    ),
                )

                monitorApi.pipelineIsDoneCreatingTasks(event.taskId)
            }

            is PipelineReadyToFinish -> {
                val results = monitorApi.getPipelineTasksResults(
                    pipelineTaskId = event.taskId,
                    taskTypeHids = listOf("worker-task"),
                )

                monitorApi.finishPipeline(
                    taskId = event.taskId,
                    taskTypeHid = event.taskTypeHid,
                    result = buildPipelineResult(results),
                )
            }

            is PipelineTaskFinished -> Unit
            is PipelineTaskTypeFinished -> Unit
        }
    }
}

This minimal shape creates connected normal tasks in response to PipelineStarted, closes task creation, waits for PipelineReadyToFinish, reads task results, and finishes the pipeline.