MCPG
Reference
Reference

Pipeline steps reference

The 18 pipeline step kinds — backend, composition, notification, SQL-container, and suspending — that compose multiple operations into a single MCP tool call, with data flow and client interaction.

Adapted from apps/gateway/docs/pipelines.md. Every example uses the real flat step shape. Validate any full config with cargo run -q -p mcpg --bin mcpg-config-check -- <file>, and start from the backends reference for the pipeline backend itself.

A pipeline binding executes an ordered sequence of steps as a single MCP tool call. Steps share data through a pipeline context, and four step kinds can suspend execution to interact with the client. The gateway's PipelineStepConfig enum defines 18 step kinds.

Step shape

Every step is a flat object: a kind: discriminator, a unique id:, and that kind's own fields as siblings — not a nested per-kind sub-block. Backend steps flatten the same config as their standalone backend (an http step carries url: / method: directly).

yaml
backend:
  kind: pipeline
  pipeline_timeout_ms: 30000           # total timeout for all steps
  steps:
    - id: step1                        # unique within this pipeline
      kind: http                       # step kind discriminator
      url: "https://api.example.com/users"
      method: post
    - id: step2
      kind: transform
      expression: 'completed_steps.step1.output.data'

Validation rules. Step IDs must be unique within a pipeline. CEL expressions in transform, cel_gate, and input_transform must not reference forward (undefined) step IDs. Step IDs are the keys in completed_steps.

Pipeline context

Every step reads three top-level context variables:

VariableTypeDescription
original_argsJSON objectTool arguments from the original tools/call
request_contextJSON objectRequest ID, session ID, identity, transport
completed_stepsJSON objectCompleted step results keyed by step ID

Each completed step result:

json
{
  "step_id": {
    "output": { "...": "..." },
    "is_error": false,
    "duration_ms": 42
  }
}

The 18 step kinds

They group into backend steps (7), composition steps (3), notification steps (2), SQL container steps (2), and suspending steps (4).


Backend steps (7)

Backend steps execute the same adapters as standalone backends — the backend config flattens onto the step. Each supports an optional input_transform CEL expression to reshape input before execution. See the backends reference for full field docs.

HTTP

yaml
- kind: http
  id: fetch_user
  url: "https://api.example.com/users"
  method: post
  timeout_ms: 5000
  expected_status_codes: [200]
  require_json_response: true
  input_transform: 'original_args'     # optional CEL expression

Command

yaml
- kind: command
  id: run_script
  command: "/usr/bin/process"
  args: ["--json"]
  timeout_ms: 10000
  require_json_stdout: true

NATS

yaml
- kind: nats
  id: backend_query
  url: "nats://broker:4222"            # all nats steps/bindings must agree
  subject: "service.query"
  timeout_ms: 5000

Kafka

yaml
- kind: kafka
  id: kafka_request
  bootstrap_servers: "broker:9092"     # all kafka steps/bindings must agree
  group_id: "mcpg"
  request_topic: "requests"
  response_topic: "responses"
  timeout_ms: 10000

gRPC

yaml
- kind: grpc
  id: grpc_call
  url: "https://grpc.example.com"
  service: "example.UserService"
  method: "GetUser"
  timeout_ms: 5000

GraphQL

yaml
- kind: graphql
  id: graphql_query
  url: "https://api.example.com/graphql"
  operation: "query { users { id name } }"
  timeout_ms: 5000

Mock

Flattens the mock backend config (response, delay_ms, error, error_message, passthrough).

yaml
- kind: mock
  id: fake_data
  response: { items: [1, 2, 3] }
  delay_ms: 100

Composition steps (3)

Transform

Produces a new value from a CEL expression over the pipeline context. The result becomes the step's output.

yaml
- kind: transform
  id: extract_name
  expression: 'completed_steps.fetch_user.output.user.name'

Common patterns: extract nested fields (completed_steps.step1.output.data.field), concatenate, or build map/list literals.

CEL gate (cel_gate)

Evaluates a CEL expression as a boolean guard. If false, the pipeline aborts with error_message (a default is used when omitted); if true, it continues.

yaml
- kind: cel_gate
  id: check_permission
  expression: 'completed_steps.fetch_user.output.role == "admin"'
  error_message: "User does not have admin role"

Plugin transform (plugin_transform)

Reshapes the context by invoking a named transform plugin instead of an inline CEL expression — a generic bridge that works with any registered transform plugin. The first user is dev.mcpg.transform.jsonata, which evaluates a JSONata expression (stronger than CEL at building nested objects, projecting, and aggregating arrays).

yaml
- kind: plugin_transform
  id: reshape
  plugin: dev.mcpg.transform.jsonata
  config:
    expression: '{ "orderIds": steps.fetch.output.orders.id, "total": $sum(steps.fetch.output.orders.amount) }'

The plugin receives the full context (steps.<id>.output, arguments, context.*, tool_name) and its output becomes the step's output. The plugin must be loaded in plugins[]; a plugin error aborts the pipeline.


Notification steps (2)

Non-suspending steps that emit a server-to-client notification on the session's SSE channel and then continue immediately.

Log

Emits a notifications/message.

yaml
- kind: log
  id: announce
  level: info
  data: "Charging order ${original_args.order_id}"

Progress

Emits a notifications/progress. Silently skipped when the inbound request carried no progressToken.

yaml
- kind: progress
  id: halfway
  progress: 0.5
  message: "Validation complete"

SQL container steps (2)

Steps that operate against a named sql backend's connection pool. See the SQL backend for the backend itself.

SQL transaction (sql_tx)

Groups one or more SQL statements under a single transaction on a referenced SQL backend's pool — all-or-nothing. Reference the backend by name via backend:.

yaml
- kind: sql_tx
  id: charge_flow
  backend: orders_db                   # existing SQL backend name
  steps:
    - id: deduct_inventory
      sql: "UPDATE inventory SET qty = qty - 1 WHERE id = :id"
      params: [id]
    - id: record_order
      sql: "INSERT INTO orders (user_id, item_id) VALUES (:u, :i)"
      params: [u, i]
      row_mode: affected_rows

A failure at any nested step rolls the whole group back. Nested results surface downstream as completed_steps.<tx_step_id>.output.steps.<nested_id>.

SQL await (sql_await)

Fire-and-wait against a referenced SQL backend whose profile declares an await block — runs the trigger, polls the check query, and evaluates the CEL predicate until it matches or times out. Same machinery as the standalone SQL await binding, exposed for pipeline composition.

yaml
- kind: sql_await
  id: wait_provisioned
  backend: provisioning_db             # existing SQL backend with an await block

Suspending steps (4)

Suspending steps interrupt execution to interact with the client. Pipeline state is persisted to the pipeline store, and a server-initiated JSON-RPC request is delivered to the client via the delivery bus. Execution resumes from the next step when the client responds; the client's response becomes the step's output.

Before suspending, the gateway checks the client's initialize capabilities. If the client does not support the requested interaction, the step returns an error instead of suspending.

Elicitation

Prompts the client for input (elicitation/create).

yaml
- kind: elicitation
  id: ask_user
  message: "Please confirm the operation:"
  requested_schema:                    # optional JSON Schema for the response
    type: object
    properties:
      confirmed: { type: boolean }

Sampling

Requests the client to perform LLM sampling (sampling/createMessage).

yaml
- kind: sampling
  id: ai_analysis
  messages:
    - role: user
      content: "Analyze this data: ${completed_steps.fetch.output}"
  max_tokens: 500

Roots list (roots_list)

Asks the client to enumerate its filesystem / URI roots (roots/list).

yaml
- kind: roots_list
  id: discover_roots
  timeout_ms: 30000

Gather

Emits several server-to-client input requests (any mix of elicitation, sampling, or roots) in one suspension and resumes once the client answers them together — distinct from listing the individual suspending steps in sequence, which suspends and resumes one at a time.

Note: inside gather.inputs[], the roots entry kind is roots (the standalone step uses roots_list).

yaml
- kind: gather
  id: collect_inputs
  inputs:                              # two or more entries, each with a correlation_token
    - kind: elicitation
      correlation_token: confirm
      message: "Confirm the operation?"
    - kind: sampling
      correlation_token: summary
      messages:
        - role: user
          content: "Summarize: ${completed_steps.fetch.output}"
      max_tokens: 200

Each answered input lands in this step's output under its correlation_token, so downstream transform steps read completed_steps.collect_inputs.output.confirm, etc.


Execution lifecycle

For each step the executor evaluates input_transform (if any), runs the step adapter, stores the result in completed_steps, and aborts if the step errored. The pipeline returns the final step's output when all steps complete. A suspending step serializes state, sends the server request, and returns 202 Accepted; the client's response loads state and resumes from the next step. Exceeding pipeline_timeout_ms returns a timeout error.

Distributed coordination

Suspending pipelines work across load-balanced gateway instances:

  • Pipeline store persists execution state for suspension and resumption — InMemoryPipelineStore (single-instance default), NatsKvPipelineStore, or RedisPipelineStore. A state_version field implements compare-and-set fencing so only one instance can resume a given pipeline.
  • Delivery bus routes server-initiated messages (elicitation / sampling requests, deferred results) to the instance holding the client's SSE stream (auto-selection: NATS > Redis > in-process). If no subscriber is connected, messages are stored as pending deliveries and replayed when the client reconnects.
  • Pipeline reaper periodically cleans up pipeline states that exceed their pipeline_timeout_ms (default sweep: 30s).

Metrics

MetricTypeLabels
mcpg_binding_executions_totalCounterbinding_name, binding_type=pipeline, outcome
mcpg_binding_execution_duration_secondsHistogrambinding_name, binding_type=pipeline, outcome
mcpg_pipeline_reaper_cleaned_totalCounter
mcpg_pipeline_reaper_last_sweep_countGauge

See also