Skip to content

Python API reference⚓︎

The sections below are generated with mkdocstrings. Keep docstrings descriptive to maintain high-quality docs.

Pipeline runtime⚓︎

BasePipeline ⚓︎

Base class for all pipelines with common configuration and setup.

enable_tracing property ⚓︎

enable_tracing

Get tracing enabled flag from config.

printer property ⚓︎

printer

Delegate to tracker.

reporter property ⚓︎

reporter

Delegate to tracker.

runtime_tracker property ⚓︎

runtime_tracker

Get the runtime tracker (created in init).

state property ⚓︎

state

Get pipeline state if available.

trace_sensitive property ⚓︎

trace_sensitive

Get trace sensitive data flag from config.

__init__ ⚓︎

__init__(config)

Initialize the pipeline using a single configuration input.

Args: spec: Configuration specification: - str/Path: Load YAML/JSON file - dict with 'config_path': Load file, then deep-merge dict on top (dict wins) - dict without 'config_path': Use as-is - BaseConfig: Use as-is strict: Whether to strictly validate configuration (default: True).

Examples: # Load from file BasePipeline("pipelines/configs/data_science.yaml")

# Dict without config_path
BasePipeline({"provider": "openai", "data": {"path": "data.csv"}})

# Dict that patches a file (use 'config_path')
BasePipeline({
    "config_path": "pipelines/configs/data_science.yaml",
    "data": {"path": "data/banana_quality.csv"},
    "user_prompt": "Custom prompt..."
})

# BaseConfig object
BasePipeline(BaseConfig(provider="openai", data={"path": "data.csv"}))

__setattr__ ⚓︎

__setattr__(name, value)

Auto-setup context when assigned to enable transparent integration.

begin_iteration ⚓︎

begin_iteration(title=None, border_style='white')

Begin a new iteration with its associated group.

Combines context.begin_iteration() + start_group() into a single call. Automatically manages the group_id internally.

Args: title: Optional custom title (default: "Iteration {index}") border_style: Border style for the group (default: "white")

Returns: The iteration record

end_group ⚓︎

end_group(group_id, *, is_done=True, title=None)

Delegate to tracker.

end_iteration ⚓︎

end_iteration(is_done=True)

End the current iteration and its associated group.

Combines context.mark_iteration_complete() + end_group() into a single call. Automatically derives group_id from current iteration.

Args: is_done: Whether the iteration completed successfully (default: True)

iterate ⚓︎

iterate(start_index=1, title=None, border_style='white')

Smart iteration management - auto-creates and advances iterations.

Single-command iteration handling that automatically: - Creates first iteration on first call - Ends previous iteration and starts next on subsequent calls - Supports custom starting index

Args: start_index: Starting iteration number (default: 1) title: Optional custom title (default: "Iteration {index}") border_style: Border style for the group (default: "white")

Returns: The iteration record

Example: while self.iteration < self.max_iterations: self.iterate() # Single command! # ... do work ...

run async ⚓︎

run(query=None)

Execute the pipeline - must be implemented by subclasses.

Each pipeline implements its own complete execution logic. Use the utility methods and context managers provided by BasePipeline.

Args: query: Optional query input (can be None for pipelines without input)

Returns: Final result (pipeline-specific)

Raises: NotImplementedError: If subclass doesn't implement this method

run_context ⚓︎

run_context(additional_logging=None, start_timer=True, enable_reporter=True, outputs_dir=None, enable_printer=True, workflow_name=None, trace_metadata=None)

Context manager for run lifecycle handling.

Manages trace context initialization, printer lifecycle, and cleanup. Provides fine-grained control over pipeline components.

Args: additional_logging: Optional callable for pipeline-specific logging start_timer: Whether to start the constraint checking timer (default: True) enable_reporter: Whether to create/start the RunReporter (default: True) outputs_dir: Override outputs directory (default: None, uses config value) enable_printer: Whether to start the live status Printer (default: True) workflow_name: Override workflow name for this run (default: None, uses self.workflow_name) trace_metadata: Additional metadata to merge into trace context (default: None)

Yields: Trace context for the workflow

run_sync ⚓︎

run_sync(*args, **kwargs)

Synchronous wrapper for the async run method.

span_context ⚓︎

span_context(span_factory, **kwargs)

Create a span context - delegates to RuntimeTracker.

start_group ⚓︎

start_group(group_id, *, title=None, border_style=None, iteration=None)

Delegate to tracker.

start_printer ⚓︎

start_printer()

Delegate to tracker.

stop_printer ⚓︎

stop_printer()

Delegate to tracker.

trace_context ⚓︎

trace_context(name, metadata=None)

Create a trace context - delegates to RuntimeTracker.

update_printer ⚓︎

update_printer(*args, **kwargs)

Update printer status if printer is active.

Delegates to RuntimeTracker.update_printer(). See RuntimeTracker.update_printer() for full documentation.

Context core⚓︎

Context engine for managing conversation state and agent I/O.

Context ⚓︎

Central coordinator for conversation state and iteration management.

Source code in contextagent/context/context.py
class Context:
    """Central coordinator for conversation state and iteration management."""

    def __init__(
        self,
        components: Union[ConversationState, List[str]]
    ) -> None:
        """Initialize context engine with conversation state.

        Args:
            components: Either a ConversationState object (for backward compatibility)
                       or a list of component names to automatically initialize:
                       - "profiles": loads all profiles via load_all_profiles()
                       - "states": creates conversation state via create_conversation_state()

        Examples:
            # Automatic initialization
            context = Context(["profiles", "states"])

            # Manual initialization (backward compatible)
            state = create_conversation_state(profiles)
            context = Context(state)
        """
        self.profiles: Optional[Dict[str, Profile]] = None
        self.context_modules: Dict[str, BaseModel] = {}

        if isinstance(components, ConversationState):
            # Backward compatible: direct state initialization
            self._state = components
        elif isinstance(components, list):
            # Automatic initialization from component list
            if "profiles" in components:
                self.profiles = load_all_profiles()

            if "states" in components:
                if self.profiles is None:
                    raise ValueError("'states' requires 'profiles' to be initialized first. Include 'profiles' in the component list.")
                self._state = create_conversation_state(self.profiles)
            elif not hasattr(self, '_state'):
                # If no state requested, create empty state
                self._state = ConversationState()
        else:
            raise TypeError(f"components must be ConversationState or list, got {type(components)}")

    @property
    def state(self) -> ConversationState:
        return self._state

    def register_context_module(self, name: str, module: BaseModel) -> None:
        self.context_modules[name] = module

    def get_context_module(self, name: str) -> BaseModel:
        if name not in self.context_modules:
            raise ValueError(f"Context module {name} not found")
        return self.context_modules[name]

    def begin_iteration(self) -> BaseIterationRecord:
        """Start a new iteration and return its record.

        Automatically starts the conversation state timer on first iteration.

        Returns:
            The iteration record
        """
        # Lazy timer start: start on first iteration if not already started
        if self._state.started_at is None:
            self._state.start_timer()

        iteration = self._state.begin_iteration()
        return iteration

    def mark_iteration_complete(self) -> None:
        """Mark the current iteration as complete."""
        self._state.mark_iteration_complete()

__init__ ⚓︎

__init__(components)

Initialize context engine with conversation state.

Args: components: Either a ConversationState object (for backward compatibility) or a list of component names to automatically initialize: - "profiles": loads all profiles via load_all_profiles() - "states": creates conversation state via create_conversation_state()

Examples: # Automatic initialization context = Context(["profiles", "states"])

# Manual initialization (backward compatible)
state = create_conversation_state(profiles)
context = Context(state)
Source code in contextagent/context/context.py
def __init__(
    self,
    components: Union[ConversationState, List[str]]
) -> None:
    """Initialize context engine with conversation state.

    Args:
        components: Either a ConversationState object (for backward compatibility)
                   or a list of component names to automatically initialize:
                   - "profiles": loads all profiles via load_all_profiles()
                   - "states": creates conversation state via create_conversation_state()

    Examples:
        # Automatic initialization
        context = Context(["profiles", "states"])

        # Manual initialization (backward compatible)
        state = create_conversation_state(profiles)
        context = Context(state)
    """
    self.profiles: Optional[Dict[str, Profile]] = None
    self.context_modules: Dict[str, BaseModel] = {}

    if isinstance(components, ConversationState):
        # Backward compatible: direct state initialization
        self._state = components
    elif isinstance(components, list):
        # Automatic initialization from component list
        if "profiles" in components:
            self.profiles = load_all_profiles()

        if "states" in components:
            if self.profiles is None:
                raise ValueError("'states' requires 'profiles' to be initialized first. Include 'profiles' in the component list.")
            self._state = create_conversation_state(self.profiles)
        elif not hasattr(self, '_state'):
            # If no state requested, create empty state
            self._state = ConversationState()
    else:
        raise TypeError(f"components must be ConversationState or list, got {type(components)}")

begin_iteration ⚓︎

begin_iteration()

Start a new iteration and return its record.

Automatically starts the conversation state timer on first iteration.

Returns: The iteration record

Source code in contextagent/context/context.py
def begin_iteration(self) -> BaseIterationRecord:
    """Start a new iteration and return its record.

    Automatically starts the conversation state timer on first iteration.

    Returns:
        The iteration record
    """
    # Lazy timer start: start on first iteration if not already started
    if self._state.started_at is None:
        self._state.start_timer()

    iteration = self._state.begin_iteration()
    return iteration

mark_iteration_complete ⚓︎

mark_iteration_complete()

Mark the current iteration as complete.

Source code in contextagent/context/context.py
def mark_iteration_complete(self) -> None:
    """Mark the current iteration as complete."""
    self._state.mark_iteration_complete()

create_conversation_state ⚓︎

create_conversation_state(profiles)
Source code in contextagent/context/conversation.py
def create_conversation_state(profiles: Dict[str, Profile]) -> "ConversationState":
    models: List[Type[BaseModel]] = []
    seen: Set[str] = set()

    for profile in profiles.values():
        model = getattr(profile, "output_schema", None)
        if model is not None and isinstance(model, type) and issubclass(model, BaseModel):
            key = f"{model.__module__}.{model.__qualname__}"
            if key not in seen:
                seen.add(key)
                models.append(model)

    if not models:
        models = [ToolAgentOutput]

    iterator = iter(models)
    union_type: Type[BaseModel] = next(iterator)
    for model in iterator:
        union_type = union_type | model  # type: ignore[operator]

    field_definitions = {
        "payloads": (List[union_type], Field(default_factory=list)),
    }

    iteration_model: Type[BaseIterationRecord] = create_model(
        "IterationRecord",
        __base__=BaseIterationRecord,
        __module__=BaseIterationRecord.__module__,
        **field_definitions,
    )
    iteration_model._output_union = union_type  # type: ignore[attr-defined]

    state = ConversationState()
    object.__setattr__(state, "_iteration_model", iteration_model)
    return state

Agent primitives⚓︎

ContextAgent ⚓︎

Bases: Agent[TContext]

Augmented Agent class with context-aware capabilities.

ContextAgent extends the base Agent class with: - Automatic context injection into instructions - Profile-based configuration (tools, instructions, output schema) - Automatic iteration tracking and state management - Runtime template rendering with state placeholders

Usage: agent = ContextAgent( context=context, profile="observe", llm="gpt-4" )

All Agent parameters can be passed via **agent_kwargs to override profile defaults: agent = ContextAgent( context=context, profile="observe", llm="gpt-4", tools=[custom_tool], # Overrides profile tools model="gpt-4-turbo" # Overrides llm parameter )

__call__ async ⚓︎

__call__(payload=None, *, tracker=None, span_name=None, span_type=None, output_model=None, printer_key=None, printer_title=None, printer_border_style=None, record_payload=None, sync=False, **span_kwargs)

Make ContextAgent callable directly.

This allows usage like: result = await agent(input_data)

When called with tracker provided (or available from context), uses the agent_step function for full tracking/tracing. Otherwise, uses ContextRunner.

Note: When calling directly without tracker, input validation is relaxed to allow string inputs even if agent has a defined input_model.

Args: payload: Input data for the agent tracker: Optional RuntimeTracker for execution with tracking. If not provided, will attempt to get from context via get_current_tracker().

Returns: Parsed output if in pipeline context, otherwise RunResult

__init__ ⚓︎

__init__(context, *, profile, llm, **agent_kwargs)

Initialize ContextAgent with context and profile identifier.

Args: context: Context object containing profiles and state profile: Profile identifier for lookup in context.profiles llm: LLM model name (e.g., "gpt-4", "claude-3-5-sonnet") **agent_kwargs: Additional Agent parameters that override profile defaults (name, tools, instructions, output_type, model, etc.)

_serialize_payload staticmethod ⚓︎

_serialize_payload(payload)

Normalize supported payload types into a string for LLM consumption.

build_contextual_instructions ⚓︎

build_contextual_instructions(payload=None)

Build instructions with automatic context injection from pipeline state.

This method compiles instructions that include: - Runtime template rendering with placeholders filled from state (if profile has runtime_template) - Original query from pipeline.context.state.query - Previous iteration history from pipeline.context.state.iteration_history() - Current input payload

Args: payload: Current input data for the agent

Returns: Formatted instructions string with full context

Note: This method requires self._context to be set.

get_context_with_wrapper ⚓︎

get_context_with_wrapper(field_name)

Get a context wrapper for a field name.

parse_output async ⚓︎

parse_output(run_result)

Apply legacy string parser only when no structured output is configured.

register_context_wrapper ⚓︎

register_context_wrapper(field_name, wrapper=identity_wrapper)

Register a context wrapper for a context field.

Tools⚓︎

Tools for agent workflows.

analyze_data async ⚓︎

analyze_data(ctx, file_path=None, target_column=None)

Performs comprehensive exploratory data analysis on a dataset.

This tool automatically uses the current dataset from the pipeline context. A file_path can optionally be provided to analyze a different dataset.

Args: ctx: Pipeline context wrapper for accessing the data store file_path: Optional path to dataset file. If not provided, uses current dataset. target_column: Optional target column for correlation analysis

Returns: Dictionary containing: - distributions: Distribution statistics for each column - correlations: Correlation matrix for numerical columns - outliers: Outlier detection results using IQR method - patterns: Identified patterns and insights - recommendations: Data quality and preprocessing recommendations Or error message string if analysis fails

crawl_website async ⚓︎

crawl_website(starting_url)

Crawls the pages of a website starting with the starting_url and then descending into the pages linked from there. Prioritizes links found in headers/navigation, then body links, then subsequent pages.

Args: starting_url: Starting URL to scrape

Returns: List of ScrapeResult objects which have the following fields: - url: The URL of the web page - title: The title of the web page - description: The description of the web page - text: The text content of the web page

create_visualization async ⚓︎

create_visualization(ctx, plot_type, file_path=None, columns=None, target_column=None, output_path=None)

Creates data visualizations from a dataset.

This tool automatically uses the current dataset from the pipeline context. A file_path can optionally be provided to visualize a different dataset.

Args: ctx: Pipeline context wrapper for accessing the data store plot_type: Type of visualization to create. Options: - "distribution": Histogram/distribution plots for numerical columns - "correlation": Correlation heatmap - "scatter": Scatter plot (requires 2 columns) - "box": Box plot for outlier detection - "bar": Bar chart for categorical data - "pairplot": Pairwise relationships plot file_path: Optional path to dataset file. If not provided, uses current dataset. columns: List of columns to visualize (optional, uses all if not specified) target_column: Target column for colored scatter/pair plots output_path: Path to save the visualization (PNG format)

Returns: Dictionary containing: - plot_type: Type of plot created - columns_plotted: Columns included in the plot - output_path: Path where plot was saved - plot_base64: Base64-encoded PNG image (if no output_path) - insights: Visual insights extracted from the plot Or error message string if visualization fails

evaluate_model async ⚓︎

evaluate_model(ctx, target_column, file_path=None, model_type='random_forest', test_size=0.2, random_state=42)

Evaluates machine learning model performance with comprehensive metrics.

This tool automatically uses the current dataset from the pipeline context. A file_path can optionally be provided to evaluate on a different dataset.

Args: ctx: Pipeline context wrapper for accessing the data store target_column: Name of the target column to predict file_path: Optional path to dataset file. If not provided, uses current dataset. model_type: Type of model to evaluate (random_forest, decision_tree, etc.) test_size: Proportion of data to use for testing (default: 0.2) random_state: Random seed for reproducibility (default: 42)

Returns: Dictionary containing: - problem_type: "classification" or "regression" - metrics: Performance metrics - confusion_matrix: Confusion matrix (for classification) - classification_report: Detailed classification report - cross_validation: Cross-validation results - error_analysis: Error distribution analysis Or error message string if evaluation fails

load_dataset async ⚓︎

load_dataset(ctx, file_path)

Loads a dataset and provides comprehensive inspection information.

This tool caches the loaded DataFrame in the pipeline data store so other tools can reuse it without reloading from disk.

Args: ctx: Pipeline context wrapper for accessing the data store file_path: Path to the dataset file (CSV, JSON, Excel, etc.)

Returns: Dictionary containing: - shape: Tuple of (rows, columns) - columns: List of column names - dtypes: Dictionary of column data types - missing_values: Dictionary of missing value counts per column - sample_data: First 5 rows as dictionary - summary_stats: Statistical summary for numerical columns - memory_usage: Memory usage information Or error message string if loading fails

preprocess_data async ⚓︎

preprocess_data(ctx, operations, file_path=None, target_column=None, output_path=None)

Performs data preprocessing operations on a dataset.

This tool automatically uses the current dataset from the pipeline context. A file_path can optionally be provided to preprocess a different dataset.

Args: ctx: Pipeline context wrapper for accessing the data store operations: List of preprocessing operations to perform. Options: - "handle_missing": Handle missing values (mean/median/mode imputation) - "remove_duplicates": Remove duplicate rows - "encode_categorical": Encode categorical variables - "scale_standard": Standardize numerical features (z-score) - "scale_minmax": Min-max scaling for numerical features - "remove_outliers": Remove outliers using IQR method - "feature_engineering": Create interaction features file_path: Optional path to dataset file. If not provided, uses current dataset. target_column: Optional target column to preserve output_path: Optional path to save preprocessed dataset

Returns: Dictionary containing: - operations_applied: List of operations performed - original_shape: Original dataset shape - preprocessed_shape: Preprocessed dataset shape - changes_summary: Summary of changes made - output_path: Path where preprocessed data was saved (if output_path provided) Or error message string if preprocessing fails

train_model async ⚓︎

train_model(ctx, target_column, file_path=None, model_type='auto', test_size=0.2, random_state=42)

Trains machine learning models on a dataset.

This tool automatically uses the current dataset from the pipeline context. A file_path can optionally be provided to train on a different dataset.

Args: ctx: Pipeline context wrapper for accessing the data store target_column: Name of the target column to predict file_path: Optional path to dataset file. If not provided, uses current dataset. model_type: Type of model to train. Options: - "auto": Automatically detect and use best model - "random_forest": Random Forest - "logistic_regression": Logistic Regression (classification) - "linear_regression": Linear Regression (regression) - "decision_tree": Decision Tree test_size: Proportion of data to use for testing (default: 0.2) random_state: Random seed for reproducibility (default: 42)

Returns: Dictionary containing: - model_type: Type of model trained - problem_type: "classification" or "regression" - train_score: Training score - test_score: Testing score - cross_val_scores: Cross-validation scores (mean and std) - feature_importance: Feature importance scores (if available) - predictions_sample: Sample of predictions vs actual values Or error message string if training fails

web_search(query)

Perform a web search for a given query and get back the URLs along with their titles, descriptions and text contents.

Args: query: The search query

Returns: List of ScrapeResult objects which have the following fields: - url: The URL of the search result - title: The title of the search result - description: The description of the search result - text: The full text content of the search result

Note

Want to document additional modules? Extend this page with more module blocks or split into dedicated subpages.