Orchestrate pipelines⚓︎
ContextAgent pipelines mirror PyTorch modules: inherit BasePipeline, define run, and wire up steps through the context core. This guide shows how to build a branching pipeline with retries and observability.
1. Scaffold the pipeline⚓︎
from pipelines.base import BasePipeline
from pydantic import BaseModel
class ResearchQuery(BaseModel):
topic: str
tone: str = "neutral"
class ResearchPipeline(BasePipeline[ResearchQuery]):
async def run(self, query: ResearchQuery):
with self.tracer.span("research.run", attributes=query.dict()):
plan = await self.context.agents["planner"](query.topic)
findings = await self.collect_sources(plan)
draft = await self.context.agents["writer"](
plan=plan,
findings=findings,
tone=query.tone,
)
return await self.review(draft)
2. Fan out work⚓︎
async def collect_sources(self, plan):
tasks = [
self.context.agents["researcher"](task)
for task in plan.tasks
]
return await self.concurrent(tasks, label="collect_sources")
3. Handle retries⚓︎
async def review(self, draft):
for attempt in self.retry(max_attempts=3, name="review"):
with attempt:
critique = await self.context.agents["reviewer"](draft=draft)
if critique.score >= 0.8:
return critique.final_report
attempt.need_retry(reason="Quality score too low")
4. Emit artifacts⚓︎
artifact = self.artifacts.save_markdown("report.md", critique.final_report)
self.artifacts.link("latest-report", artifact)
return artifact
5. Route branches⚓︎
if critique.requires_followup:
return await self.context.agents["researcher"](
critique.followup_prompt,
)
return critique.final_report
6. Configure tracing + telemetry⚓︎
pipelines/configs/research.yaml
telemetry:
exporter: otlp
endpoint: https://telemetry.contextagent.ai
sampling_rate: 0.1
retries:
default:
max_attempts: 3
backoff: exponential
7. Run the pipeline⚓︎
Observability checklist⚓︎
- ✅ Spans show each agent + tool invocation.
- ✅ Artifacts tracked per run and accessible via pipeline manager UI.
- ✅ Retries labeled with cause + resolution.
Tip
Use the pipelines.manager module to schedule recurring jobs or connect to your orchestration platform (Airflow, Temporal, Dagster).
Next: learn how to deploy pipelines to production.