Pipeline steps reference
The 18 pipeline step kinds — backend, composition, notification, SQL-container, and suspending — that compose multiple operations into a single MCP tool call, with data flow and client interaction.
Adapted from
apps/gateway/docs/pipelines.md. Every example uses the real flat step shape. Validate any full config withcargo run -q -p mcpg --bin mcpg-config-check -- <file>, and start from the backends reference for the pipeline backend itself.
A pipeline binding executes an ordered sequence of steps as a single MCP
tool call. Steps share data through a pipeline context, and four step kinds can
suspend execution to interact with the client. The gateway's
PipelineStepConfig enum defines 18 step kinds.
Step shape
Every step is a flat object: a kind: discriminator, a unique id:, and
that kind's own fields as siblings — not a nested per-kind sub-block. Backend
steps flatten the same config as their standalone backend (an http step
carries url: / method: directly).
backend:
kind: pipeline
pipeline_timeout_ms: 30000 # total timeout for all steps
steps:
- id: step1 # unique within this pipeline
kind: http # step kind discriminator
url: "https://api.example.com/users"
method: post
- id: step2
kind: transform
expression: 'completed_steps.step1.output.data'
Validation rules. Step IDs must be unique within a pipeline. CEL
expressions in transform, cel_gate, and input_transform must not
reference forward (undefined) step IDs. Step IDs are the keys in
completed_steps.
Pipeline context
Every step reads three top-level context variables:
| Variable | Type | Description |
|---|---|---|
original_args | JSON object | Tool arguments from the original tools/call |
request_context | JSON object | Request ID, session ID, identity, transport |
completed_steps | JSON object | Completed step results keyed by step ID |
Each completed step result:
{
"step_id": {
"output": { "...": "..." },
"is_error": false,
"duration_ms": 42
}
}
The 18 step kinds
They group into backend steps (7), composition steps (3), notification steps (2), SQL container steps (2), and suspending steps (4).
Backend steps (7)
Backend steps execute the same adapters as standalone backends — the backend
config flattens onto the step. Each supports an optional input_transform CEL
expression to reshape input before execution. See the
backends reference for full field docs.
HTTP
- kind: http
id: fetch_user
url: "https://api.example.com/users"
method: post
timeout_ms: 5000
expected_status_codes: [200]
require_json_response: true
input_transform: 'original_args' # optional CEL expression
Command
- kind: command
id: run_script
command: "/usr/bin/process"
args: ["--json"]
timeout_ms: 10000
require_json_stdout: true
NATS
- kind: nats
id: backend_query
url: "nats://broker:4222" # all nats steps/bindings must agree
subject: "service.query"
timeout_ms: 5000
Kafka
- kind: kafka
id: kafka_request
bootstrap_servers: "broker:9092" # all kafka steps/bindings must agree
group_id: "mcpg"
request_topic: "requests"
response_topic: "responses"
timeout_ms: 10000
gRPC
- kind: grpc
id: grpc_call
url: "https://grpc.example.com"
service: "example.UserService"
method: "GetUser"
timeout_ms: 5000
GraphQL
- kind: graphql
id: graphql_query
url: "https://api.example.com/graphql"
operation: "query { users { id name } }"
timeout_ms: 5000
Mock
Flattens the mock backend config (response, delay_ms, error,
error_message, passthrough).
- kind: mock
id: fake_data
response: { items: [1, 2, 3] }
delay_ms: 100
Composition steps (3)
Transform
Produces a new value from a CEL expression over the pipeline context. The
result becomes the step's output.
- kind: transform
id: extract_name
expression: 'completed_steps.fetch_user.output.user.name'
Common patterns: extract nested fields
(completed_steps.step1.output.data.field), concatenate, or build map/list
literals.
CEL gate (cel_gate)
Evaluates a CEL expression as a boolean guard. If false, the pipeline aborts
with error_message (a default is used when omitted); if true, it continues.
- kind: cel_gate
id: check_permission
expression: 'completed_steps.fetch_user.output.role == "admin"'
error_message: "User does not have admin role"
Plugin transform (plugin_transform)
Reshapes the context by invoking a named transform plugin instead of an
inline CEL expression — a generic bridge that works with any registered
transform plugin. The first user is dev.mcpg.transform.jsonata, which
evaluates a JSONata expression (stronger than CEL at building nested objects,
projecting, and aggregating arrays).
- kind: plugin_transform
id: reshape
plugin: dev.mcpg.transform.jsonata
config:
expression: '{ "orderIds": steps.fetch.output.orders.id, "total": $sum(steps.fetch.output.orders.amount) }'
The plugin receives the full context (steps.<id>.output, arguments,
context.*, tool_name) and its output becomes the step's output. The
plugin must be loaded in plugins[]; a plugin error aborts the pipeline.
Notification steps (2)
Non-suspending steps that emit a server-to-client notification on the session's SSE channel and then continue immediately.
Log
Emits a notifications/message.
- kind: log
id: announce
level: info
data: "Charging order ${original_args.order_id}"
Progress
Emits a notifications/progress. Silently skipped when the inbound request
carried no progressToken.
- kind: progress
id: halfway
progress: 0.5
message: "Validation complete"
SQL container steps (2)
Steps that operate against a named sql backend's connection pool. See the
SQL backend for the backend itself.
SQL transaction (sql_tx)
Groups one or more SQL statements under a single transaction on a referenced
SQL backend's pool — all-or-nothing. Reference the backend by name via
backend:.
- kind: sql_tx
id: charge_flow
backend: orders_db # existing SQL backend name
steps:
- id: deduct_inventory
sql: "UPDATE inventory SET qty = qty - 1 WHERE id = :id"
params: [id]
- id: record_order
sql: "INSERT INTO orders (user_id, item_id) VALUES (:u, :i)"
params: [u, i]
row_mode: affected_rows
A failure at any nested step rolls the whole group back. Nested results surface
downstream as completed_steps.<tx_step_id>.output.steps.<nested_id>.
SQL await (sql_await)
Fire-and-wait against a referenced SQL backend whose profile declares an
await block — runs the trigger, polls the check query, and evaluates the CEL
predicate until it matches or times out. Same machinery as the standalone SQL
await binding, exposed for pipeline composition.
- kind: sql_await
id: wait_provisioned
backend: provisioning_db # existing SQL backend with an await block
Suspending steps (4)
Suspending steps interrupt execution to interact with the client. Pipeline
state is persisted to the pipeline store, and a server-initiated JSON-RPC
request is delivered to the client via the delivery bus. Execution resumes from
the next step when the client responds; the client's response becomes the
step's output.
Before suspending, the gateway checks the client's initialize capabilities.
If the client does not support the requested interaction, the step returns an
error instead of suspending.
Elicitation
Prompts the client for input (elicitation/create).
- kind: elicitation
id: ask_user
message: "Please confirm the operation:"
requested_schema: # optional JSON Schema for the response
type: object
properties:
confirmed: { type: boolean }
Sampling
Requests the client to perform LLM sampling (sampling/createMessage).
- kind: sampling
id: ai_analysis
messages:
- role: user
content: "Analyze this data: ${completed_steps.fetch.output}"
max_tokens: 500
Roots list (roots_list)
Asks the client to enumerate its filesystem / URI roots (roots/list).
- kind: roots_list
id: discover_roots
timeout_ms: 30000
Gather
Emits several server-to-client input requests (any mix of elicitation, sampling, or roots) in one suspension and resumes once the client answers them together — distinct from listing the individual suspending steps in sequence, which suspends and resumes one at a time.
Note: inside gather.inputs[], the roots entry kind is roots (the
standalone step uses roots_list).
- kind: gather
id: collect_inputs
inputs: # two or more entries, each with a correlation_token
- kind: elicitation
correlation_token: confirm
message: "Confirm the operation?"
- kind: sampling
correlation_token: summary
messages:
- role: user
content: "Summarize: ${completed_steps.fetch.output}"
max_tokens: 200
Each answered input lands in this step's output under its correlation_token,
so downstream transform steps read
completed_steps.collect_inputs.output.confirm, etc.
Execution lifecycle
For each step the executor evaluates input_transform (if any), runs the step
adapter, stores the result in completed_steps, and aborts if the step errored.
The pipeline returns the final step's output when all steps complete. A
suspending step serializes state, sends the server request, and returns
202 Accepted; the client's response loads state and resumes from the next
step. Exceeding pipeline_timeout_ms returns a timeout error.
Distributed coordination
Suspending pipelines work across load-balanced gateway instances:
- Pipeline store persists execution state for suspension and resumption —
InMemoryPipelineStore(single-instance default),NatsKvPipelineStore, orRedisPipelineStore. Astate_versionfield implements compare-and-set fencing so only one instance can resume a given pipeline. - Delivery bus routes server-initiated messages (elicitation / sampling requests, deferred results) to the instance holding the client's SSE stream (auto-selection: NATS > Redis > in-process). If no subscriber is connected, messages are stored as pending deliveries and replayed when the client reconnects.
- Pipeline reaper periodically cleans up pipeline states that exceed their
pipeline_timeout_ms(default sweep: 30s).
Metrics
| Metric | Type | Labels |
|---|---|---|
mcpg_binding_executions_total | Counter | binding_name, binding_type=pipeline, outcome |
mcpg_binding_execution_duration_seconds | Histogram | binding_name, binding_type=pipeline, outcome |
mcpg_pipeline_reaper_cleaned_total | Counter | — |
mcpg_pipeline_reaper_last_sweep_count | Gauge | — |
See also
- Backends reference — all 27 backend kinds,
including the pipeline backend and the SQL backend referenced by
sql_tx/sql_await. - Configuration reference — the full
AppConfigkey tree.