State and Context Management#

LangGraph-native state management with context persistence and sophisticated data management using Pydantic for automatic serialization, validation, and type safety.

Core State System#

The state system provides a clean separation between persistent context data and execution-scoped fields that reset automatically between conversation turns, built on LangGraph’s MessagesState foundation.

AgentState#

class osprey.state.AgentState(*args, **kwargs)[source]#

Bases: MessagesState

LangGraph-native conversational agent state with comprehensive execution tracking.

This TypedDict class extends LangGraph’s MessagesState to provide a complete state structure for conversational AI agents. The state design implements selective persistence where only capability context data accumulates across conversation turns, while all execution-related fields reset automatically between graph invocations for optimal performance and state clarity.

State Architecture:

The state follows a flat structure with logical field prefixes for organization:

  • Persistent Fields: Only capability_context_data persists across conversations

  • Execution-Scoped Fields: All other fields reset automatically each invocation

  • No Custom Reducers: Leverages LangGraph’s native message handling patterns

  • Type Safety: Comprehensive TypedDict definitions with proper type annotations

Field Organization by Prefix:

  • task_* : Task extraction and input processing state

  • planning_* : Classification and orchestration management

  • execution_* : Step execution results and tracking

  • control_* : Control flow and retry logic state

  • ui_* : UI-specific results and command generation

  • runtime_* : Runtime metadata and checkpoint information

Context Data Structure:

The capability_context_data field uses a three-level dictionary optimized for LangGraph serialization and efficient context retrieval:

{
    context_type: {
        context_key: {
            field: value,
            ...
        }
    }
}

State Lifecycle:

  1. Fresh State Creation: StateManager.create_fresh_state() initializes defaults

  2. Context Preservation: Only capability_context_data carries forward

  3. Execution Reset: All execution fields reset to defaults each turn

  4. Message Handling: LangGraph manages message history automatically

Note

TypedDict classes cannot define default values in the class definition. Always use StateManager.create_fresh_state() to create properly initialized state instances. Only capability_context_data will be preserved from previous state if available.

Warning

Direct manipulation of state fields may interfere with LangGraph’s automatic checkpointing and serialization. Use StateManager utilities for all state operations to ensure compatibility and proper behavior.

Examples

Creating fresh state through StateManager:

>>> from osprey.state import StateManager
>>> state = StateManager.create_fresh_state(
...     user_input="Find beam current PV addresses",
...     current_state=previous_state
... )
>>> state['task_current_task']
None

Accessing persistent context data:

>>> context_data = state['capability_context_data']
>>> pv_addresses = context_data.get('PV_ADDRESSES', {})
>>> len(pv_addresses)
3

See also

osprey.state.StateManager : State creation and management utilities osprey.context.ContextManager : Context data access and storage merge_capability_context_data() : Custom reducer for context persistence osprey.base.planning.ExecutionPlan : Execution planning structures

Key State Fields

Persistent Field:

Return type:

Any

capability_context_data: Dict[str, Dict[str, Dict[str, Any]]]

Only field that persists across conversation turns. Uses three-level dictionary structure: {context_type: {context_key: {field: value}}}. Uses custom reducer merge_capability_context_data().

Execution-Scoped Fields (reset each invocation):

task_current_task: str | None#

Current task being processed.

planning_execution_plan: ExecutionPlan | None#

Current execution plan from orchestrator.

execution_step_results: Dict[str, Any]#

Results from completed execution steps.

control_has_error: bool#

Error state for manual retry handling.

capability_context_data: Any]]], <function merge_capability_context_data at 0x7fc58ad83ba0>]#
agent_control: dict[str, Any]#
status_updates: list[dict[str, Any]]#
progress_events: list[dict[str, Any]]#
task_current_task: str | None#
task_depends_on_chat_history: bool#
task_depends_on_user_memory: bool#
task_custom_message: str | None#
planning_active_capabilities: list[str]#
planning_execution_plan: ExecutionPlan | None#
planning_current_step_index: int#
execution_step_results: dict[str, Any]#
execution_last_result: ExecutionResult | None#
execution_pending_approvals: dict[str, ApprovalRequest]#
execution_start_time: float | None#
execution_total_time: float | None#
approval_approved: bool | None#
approved_payload: dict[str, Any] | None#
control_reclassification_reason: str | None#
control_reclassification_count: int#
control_plans_created_count: int#
control_current_step_retry_count: int#
control_retry_count: int#
control_has_error: bool#
control_error_info: dict[str, Any] | None#
control_last_error: dict[str, Any] | None#
control_max_retries: int#
control_is_killed: bool#
control_kill_reason: str | None#
control_is_awaiting_validation: bool#
control_validation_context: dict[str, Any] | None#
control_validation_timestamp: float | None#
ui_captured_notebooks: list[dict[str, Any]]#
ui_captured_figures: list[dict[str, Any]]#
ui_launchable_commands: list[dict[str, Any]]#
ui_agent_context: dict[str, Any] | None#
runtime_checkpoint_metadata: dict[str, Any] | None#
runtime_info: dict[str, Any] | None#

StateManager#

class osprey.state.StateManager[source]#

Bases: object

LangGraph-native state management utilities for conversational AI agents.

This class provides a comprehensive suite of static utilities for managing conversational agent state using LangGraph’s native patterns. The StateManager serves as the primary interface for state creation, manipulation, and context storage operations throughout the Osprey Agent Framework.

Core Functionality:

The StateManager implements a simplified state management approach that leverages LangGraph’s native patterns for optimal performance and compatibility:

  • Fresh State Creation: Initialize new conversation states with proper defaults

  • Context Persistence: Maintain capability context data across conversation turns

  • State Updates: Generate LangGraph-compatible state update dictionaries

  • Context Storage: One-liner context storage for capability results

  • Execution Tracking: Utilities for accessing execution plans and current steps

Design Philosophy:

The StateManager follows a “batteries included” approach where common state operations are provided as simple, one-liner utilities. This eliminates the need for capabilities and infrastructure components to implement complex state logic.

State Lifecycle Management:

  1. Initialization: create_fresh_state() creates new conversation states

  2. Execution: Utilities provide access to current execution context

  3. Context Storage: store_context() handles capability result persistence

  4. State Updates: Methods generate proper LangGraph state update dictionaries

Note

All StateManager methods are static utilities that do not require instantiation. The class serves as a namespace for state management operations.

Warning

StateManager utilities are optimized for LangGraph’s native patterns. Using these utilities ensures compatibility with checkpointing and serialization. Direct state manipulation may cause issues.

Examples

Creating fresh state for new conversation:

>>> state = StateManager.create_fresh_state(
...     user_input="Find beam current data",
...     current_state=previous_state
... )

Storing capability context results:

>>> updates = StateManager.store_context(
...     state, "PV_ADDRESSES", "step1", pv_data
... )
>>> return updates  # Compatible with LangGraph

Accessing current execution step:

>>> step = StateManager.get_current_step(state)
>>> task_objective = step.get('task_objective')

See also

osprey.state.AgentState : Main state structure managed by this class osprey.context.ContextManager : Context data management utilities get_agent_control_defaults() : Configuration defaults for state creation

Key Methods

static create_fresh_state(user_input, current_state=None)[source]#

Create fresh agent state for a new conversation turn with selective persistence.

This method creates a complete fresh state for LangGraph execution while preserving only the capability context data from the previous state. All execution-scoped fields are reset to their default values, ensuring clean state for each conversation turn while maintaining accumulated context.

The state creation process handles the complete lifecycle initialization:

  1. Message Initialization: Creates properly formatted user message

  2. Context Preservation: Extracts and preserves capability context data

  3. Default Population: Initializes all execution fields with safe defaults

  4. Configuration Loading: Applies agent control defaults from configuration

This approach ensures optimal performance by avoiding state bloat while maintaining the persistent context needed for multi-turn conversations.

Parameters:
  • user_input (str) – The user’s message content for this conversation turn

  • current_state (Optional[AgentState]) – Previous agent state to preserve context from (optional)

Returns:

Complete fresh state ready for LangGraph graph invocation

Return type:

AgentState

Note

Only capability_context_data persists across conversation turns. All other fields including execution results, control state, and UI data are reset to defaults for optimal performance and state clarity.

Warning

The current_state parameter should be the complete previous AgentState. Partial state dictionaries may cause context preservation to fail silently.

Examples

Creating state for new conversation:

>>> state = StateManager.create_fresh_state("Find beam current PVs")
>>> state['task_current_task'] is None
True
>>> len(state['messages'])
1

Preserving context from previous conversation:

>>> new_state = StateManager.create_fresh_state(
...     "Show me the latest data",
...     current_state=previous_state
... )
>>> # Context preserved but execution state reset
>>> len(new_state['capability_context_data'])
3  # Preserved from previous
>>> new_state['execution_step_results']
{}  # Reset to empty

See also

AgentState : State structure created by this method get_agent_control_defaults() : Configuration defaults applied osprey.state.MessageUtils : Message creation utilities

static store_context(state, context_type, context_key, context_object)[source]#

Store capability context data and return LangGraph-compatible state updates.

This is the primary utility function that capabilities should use for storing context data in the agent state. The method provides a one-liner interface that handles all the complexity of context management, serialization, and state update generation for seamless integration with LangGraph’s state system.

The function performs the complete context storage workflow:

  1. Context Manager Creation: Initializes ContextManager from current state

  2. Context Serialization: Converts CapabilityContext to dictionary format

  3. State Integration: Merges context data into existing state structure

  4. Update Generation: Returns LangGraph-compatible state update dictionary

This approach ensures that capability results are properly persisted across conversation turns while maintaining optimal serialization performance.

Parameters:
  • state (AgentState) – Current agent state containing existing context data

  • context_type (str) – Context type identifier (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)

  • context_key (str) – Unique key for this context instance within the type

  • context_object (CapabilityContext) – CapabilityContext object containing data to store

Returns:

State update dictionary for LangGraph automatic merging

Return type:

StateUpdate

Note

This is the recommended pattern for all capability context storage. The returned dictionary can be directly returned from capability execute methods and will be automatically merged by LangGraph.

Warning

The context_key should be unique within the context_type to avoid overwriting existing context data. Consider using step context keys or timestamp-based keys for uniqueness.

Examples

Basic capability context storage:

>>> @staticmethod
>>> async def execute(state: AgentState, **kwargs):
...     # Capability logic here
...     result = PVAddresses(pvs=found_pvs, description="Found PVs")
...
...     # Store and return state updates (one-liner)
...     step = StateManager.get_current_step(state)
...     return StateManager.store_context(
...         state, "PV_ADDRESSES", step.get('context_key'), result
...     )

Context storage with custom key:

>>> analysis_result = DataAnalysis(
...     results=processed_data,
...     timestamp=datetime.now()
... )
>>> updates = StateManager.store_context(
...     state, "ANALYSIS", f"beam_analysis_{timestamp}", analysis_result
... )
>>> return updates

See also

osprey.context.ContextManager : Underlying context management osprey.context.CapabilityContext : Base class for context objects get_current_step() : Utility for getting current execution step

static get_current_step(state)[source]#

Get current execution step from state.

This is the unified utility for accessing the current step being executed. Replaces the old pattern of importing _extract_current_step from decorators.

Parameters:

state (AgentState) – Current conversation state

Returns:

Current step dictionary with capability, task_objective, etc.

Return type:

PlannedStep

Raises:

RuntimeError – If execution plan is missing or step index is invalid

Example

# In any capability or infrastructure component:
from osprey.state import StateManager

step = StateManager.get_current_step(state)
task_objective = step.get('task_objective')
capability = step.get('capability')
static get_execution_plan(state)[source]#

Get current execution plan from state with type validation.

Performs type validation to ensure the returned ExecutionPlan is properly formed (TypedDict with ‘get’ method). This eliminates the need for downstream callers to perform hasattr checks.

Parameters:

state (AgentState) – Current conversation state

Returns:

ExecutionPlan if available and valid, None otherwise

Return type:

ExecutionPlan | None

static register_figure(state, capability, figure_path, display_name=None, metadata=None, current_figures=None)[source]#

Register a figure in the centralized UI registry.

This is the single point of entry for all capabilities to register figures for UI display. Provides a capability-agnostic interface that works for Python, R, Julia, or any other figure-generating capability.

Parameters:
  • state (AgentState) – Current agent state

  • capability (str) – Capability identifier (e.g., “python_executor”, “r_executor”)

  • figure_path (str) – Path to the figure file (absolute or relative)

  • display_name (str | None) – Optional human-readable figure name

  • metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary

  • current_figures (list[dict[str, Any]] | None) – Optional list of current figures to accumulate (otherwise get from state)

Returns:

State update dictionary with ui_captured_figures update

Return type:

dict[str, Any]

Examples

Basic figure registration:

>>> figure_update = StateManager.register_figure(
...     state, "python_executor", "/path/to/plot.png"
... )
>>> return {**other_updates, **figure_update}

Rich figure registration:

>>> figure_update = StateManager.register_figure(
...     state,
...     capability="python_executor",
...     figure_path="figures/analysis.png",
...     display_name="Performance Analysis",
...     metadata={
...         "execution_folder": "/path/to/execution",
...         "notebook_link": "http://jupyter/notebook.ipynb"
...         "figure_type": "matplotlib_png"
...     }
... )
static register_notebook(state, capability, notebook_path, notebook_link, display_name=None, metadata=None)[source]#

Register a notebook in the centralized UI registry.

This method provides notebook registration functionality that was being called but didn’t exist. It follows the same pattern as figure registration.

Parameters:
  • state (AgentState) – Current agent state

  • capability (str) – Capability identifier (e.g., “python_executor”, “data_analysis”)

  • notebook_path (str) – Path to the notebook file

  • notebook_link (str) – Link to access the notebook

  • display_name (str | None) – Optional human-readable notebook name

  • metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary

Returns:

State update dictionary with ui_captured_notebooks update

Return type:

dict[str, Any]

static create_fresh_state(user_input, current_state=None)[source]#

Create fresh agent state for a new conversation turn with selective persistence.

This method creates a complete fresh state for LangGraph execution while preserving only the capability context data from the previous state. All execution-scoped fields are reset to their default values, ensuring clean state for each conversation turn while maintaining accumulated context.

The state creation process handles the complete lifecycle initialization:

  1. Message Initialization: Creates properly formatted user message

  2. Context Preservation: Extracts and preserves capability context data

  3. Default Population: Initializes all execution fields with safe defaults

  4. Configuration Loading: Applies agent control defaults from configuration

This approach ensures optimal performance by avoiding state bloat while maintaining the persistent context needed for multi-turn conversations.

Parameters:
  • user_input (str) – The user’s message content for this conversation turn

  • current_state (Optional[AgentState]) – Previous agent state to preserve context from (optional)

Returns:

Complete fresh state ready for LangGraph graph invocation

Return type:

AgentState

Note

Only capability_context_data persists across conversation turns. All other fields including execution results, control state, and UI data are reset to defaults for optimal performance and state clarity.

Warning

The current_state parameter should be the complete previous AgentState. Partial state dictionaries may cause context preservation to fail silently.

Examples

Creating state for new conversation:

>>> state = StateManager.create_fresh_state("Find beam current PVs")
>>> state['task_current_task'] is None
True
>>> len(state['messages'])
1

Preserving context from previous conversation:

>>> new_state = StateManager.create_fresh_state(
...     "Show me the latest data",
...     current_state=previous_state
... )
>>> # Context preserved but execution state reset
>>> len(new_state['capability_context_data'])
3  # Preserved from previous
>>> new_state['execution_step_results']
{}  # Reset to empty

See also

AgentState : State structure created by this method get_agent_control_defaults() : Configuration defaults applied osprey.state.MessageUtils : Message creation utilities

static get_current_task(state)[source]#

Get current task from state.

Return type:

str | None

static get_user_query(state)[source]#

Get the user’s query from the current conversation.

Extracts the most recent user message from the conversation history, which represents the original user query that started the current conversation turn.

Parameters:

state (AgentState) – Current conversation state

Returns:

The user’s query string, or None if no user messages exist

Return type:

str | None

static store_context(state, context_type, context_key, context_object)[source]#

Store capability context data and return LangGraph-compatible state updates.

This is the primary utility function that capabilities should use for storing context data in the agent state. The method provides a one-liner interface that handles all the complexity of context management, serialization, and state update generation for seamless integration with LangGraph’s state system.

The function performs the complete context storage workflow:

  1. Context Manager Creation: Initializes ContextManager from current state

  2. Context Serialization: Converts CapabilityContext to dictionary format

  3. State Integration: Merges context data into existing state structure

  4. Update Generation: Returns LangGraph-compatible state update dictionary

This approach ensures that capability results are properly persisted across conversation turns while maintaining optimal serialization performance.

Parameters:
  • state (AgentState) – Current agent state containing existing context data

  • context_type (str) – Context type identifier (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)

  • context_key (str) – Unique key for this context instance within the type

  • context_object (CapabilityContext) – CapabilityContext object containing data to store

Returns:

State update dictionary for LangGraph automatic merging

Return type:

StateUpdate

Note

This is the recommended pattern for all capability context storage. The returned dictionary can be directly returned from capability execute methods and will be automatically merged by LangGraph.

Warning

The context_key should be unique within the context_type to avoid overwriting existing context data. Consider using step context keys or timestamp-based keys for uniqueness.

Examples

Basic capability context storage:

>>> @staticmethod
>>> async def execute(state: AgentState, **kwargs):
...     # Capability logic here
...     result = PVAddresses(pvs=found_pvs, description="Found PVs")
...
...     # Store and return state updates (one-liner)
...     step = StateManager.get_current_step(state)
...     return StateManager.store_context(
...         state, "PV_ADDRESSES", step.get('context_key'), result
...     )

Context storage with custom key:

>>> analysis_result = DataAnalysis(
...     results=processed_data,
...     timestamp=datetime.now()
... )
>>> updates = StateManager.store_context(
...     state, "ANALYSIS", f"beam_analysis_{timestamp}", analysis_result
... )
>>> return updates

See also

osprey.context.ContextManager : Underlying context management osprey.context.CapabilityContext : Base class for context objects get_current_step() : Utility for getting current execution step

static get_messages(state)[source]#

Get messages from state.

Return type:

list[BaseMessage]

static create_response_update(response)[source]#

Create a state update that adds an assistant response using native LangGraph pattern.

Parameters:

response (str) – The assistant’s response

Returns:

Update that adds the response message to the conversation

Return type:

StateUpdate

static get_execution_plan(state)[source]#

Get current execution plan from state with type validation.

Performs type validation to ensure the returned ExecutionPlan is properly formed (TypedDict with ‘get’ method). This eliminates the need for downstream callers to perform hasattr checks.

Parameters:

state (AgentState) – Current conversation state

Returns:

ExecutionPlan if available and valid, None otherwise

Return type:

ExecutionPlan | None

static get_current_step_index(state)[source]#

Get current step index from state.

Parameters:

state (AgentState) – Current conversation state

Returns:

Current step index (defaults to 0 if not set)

Return type:

int

static get_current_step(state)[source]#

Get current execution step from state.

This is the unified utility for accessing the current step being executed. Replaces the old pattern of importing _extract_current_step from decorators.

Parameters:

state (AgentState) – Current conversation state

Returns:

Current step dictionary with capability, task_objective, etc.

Return type:

PlannedStep

Raises:

RuntimeError – If execution plan is missing or step index is invalid

Example

# In any capability or infrastructure component:
from osprey.state import StateManager

step = StateManager.get_current_step(state)
task_objective = step.get('task_objective')
capability = step.get('capability')
static register_figure(state, capability, figure_path, display_name=None, metadata=None, current_figures=None)[source]#

Register a figure in the centralized UI registry.

This is the single point of entry for all capabilities to register figures for UI display. Provides a capability-agnostic interface that works for Python, R, Julia, or any other figure-generating capability.

Parameters:
  • state (AgentState) – Current agent state

  • capability (str) – Capability identifier (e.g., “python_executor”, “r_executor”)

  • figure_path (str) – Path to the figure file (absolute or relative)

  • display_name (str | None) – Optional human-readable figure name

  • metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary

  • current_figures (list[dict[str, Any]] | None) – Optional list of current figures to accumulate (otherwise get from state)

Returns:

State update dictionary with ui_captured_figures update

Return type:

dict[str, Any]

Examples

Basic figure registration:

>>> figure_update = StateManager.register_figure(
...     state, "python_executor", "/path/to/plot.png"
... )
>>> return {**other_updates, **figure_update}

Rich figure registration:

>>> figure_update = StateManager.register_figure(
...     state,
...     capability="python_executor",
...     figure_path="figures/analysis.png",
...     display_name="Performance Analysis",
...     metadata={
...         "execution_folder": "/path/to/execution",
...         "notebook_link": "http://jupyter/notebook.ipynb"
...         "figure_type": "matplotlib_png"
...     }
... )
static register_command(state, capability, launch_uri, display_name=None, command_type=None, metadata=None, current_commands=None)[source]#

Register a launchable command in the centralized UI registry.

This method allows capabilities to register commands that users can execute through the UI. Commands are typically external applications, web interfaces, or desktop tools that can be launched via URIs.

Parameters:
  • state (AgentState) – Current agent state

  • capability (str) – Name of the capability that generated this command

  • launch_uri (str) – URI that can be used to launch the command (e.g., http://, file://, custom://)

  • display_name (str | None) – Optional human-readable name for the command

  • command_type (str | None) – Optional type of command (e.g., ‘web_app’, ‘desktop_app’, ‘viewer’)

  • metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary

  • current_commands (list[dict[str, Any]] | None) – Optional list of current commands to accumulate (otherwise get from state)

Returns:

State update dictionary with ui_launchable_commands update

Return type:

dict[str, Any]

Examples

Basic command registration:

>>> command_update = StateManager.register_command(
...     state, "file_processor", "file:///path/to/results.html"
... )
>>> return {**other_updates, **command_update}

Rich command registration:

>>> command_update = StateManager.register_command(
...     state,
...     capability="data_visualizer",
...     launch_uri="http://localhost:8080/dashboard",
...     display_name="Interactive Dashboard",
...     command_type="web_app",
...     metadata={
...         "port": 8080,
...         "data_source": "analysis_results",
...         "chart_count": 3
...     }
... )
static register_notebook(state, capability, notebook_path, notebook_link, display_name=None, metadata=None)[source]#

Register a notebook in the centralized UI registry.

This method provides notebook registration functionality that was being called but didn’t exist. It follows the same pattern as figure registration.

Parameters:
  • state (AgentState) – Current agent state

  • capability (str) – Capability identifier (e.g., “python_executor”, “data_analysis”)

  • notebook_path (str) – Path to the notebook file

  • notebook_link (str) – Link to access the notebook

  • display_name (str | None) – Optional human-readable notebook name

  • metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary

Returns:

State update dictionary with ui_captured_notebooks update

Return type:

dict[str, Any]

Context Management System#

The context management system provides sophisticated functionality over dictionary data while maintaining LangGraph-compatible storage using Pydantic’s built-in serialization capabilities.

ContextManager#

class osprey.context.ContextManager(state)[source]#

Bases: object

Simplified LangGraph-native context manager using Pydantic serialization.

This class provides sophisticated functionality over dictionary data while storing everything in LangGraph-compatible dictionary format. It uses Pydantic’s built-in serialization capabilities to eliminate complex custom logic.

The data is stored as: {context_type: {context_key: {field: value}}}

Initialize ContextManager with agent state.

Parameters:

state (AgentState) – Full AgentState containing capability_context_data

Raises:
  • TypeError – If state is not an AgentState dictionary

  • ValueError – If state doesn’t contain capability_context_data key

Key Methods

Context Storage and Retrieval:

set_context(context_type, key, value, skip_validation=False)[source]#

Store context using Pydantic’s built-in serialization.

Parameters:
  • context_type (str) – Type of context (e.g., “PV_ADDRESSES”)

  • key (str) – Unique key for this context instance

  • value (CapabilityContext) – CapabilityContext object to store

  • skip_validation (bool) – Skip registry validation (useful for testing)

get_context(context_type, key)[source]#

Retrieve using Pydantic’s .model_validate() for reconstruction.

Parameters:
  • context_type (str) – Type of context to retrieve

  • key (str) – Key of the context instance

Returns:

Reconstructed CapabilityContext object or None if not found

Return type:

CapabilityContext | None

get_all_of_type(context_type)[source]#

Get all contexts of a specific type as reconstructed objects.

Parameters:

context_type (str) – Type of context to retrieve

Returns:

Dictionary of key -> CapabilityContext objects

Return type:

dict[str, CapabilityContext]

get_all()[source]#

Get all context data in flattened format for reporting/summary purposes.

Returns:

Dictionary with flattened keys in format “context_type.key” -> context object

Return type:

dict[str, Any]

Context Extraction and Description:

extract_from_step(step, state, constraints=None, constraint_mode='hard')[source]

Extract all contexts specified in step.inputs with optional type and cardinality constraints.

This method consolidates the common pattern of extracting context data from step inputs and validating against expected types and instance counts. It replaces repetitive validation logic across capabilities.

Parameters:
  • step (dict[str, Any]) – Execution step with inputs list like [{"PV_ADDRESSES": "key1"}, {"TIME_RANGE": "key2"}]

  • state (dict[str, Any]) – Current agent state (for error checking)

  • constraints (list[str | tuple[str, str]] | None) –

    Optional list of required context types. Each item can be: - String: context type with no cardinality restriction (e.g., "PV_ADDRESSES") - Tuple: (context_type, cardinality) where cardinality is:

    • "single": Must be exactly one instance (not a list)

    • "multiple": Must be multiple instances (must be a list)

    Example: ["PV_ADDRESSES", ("TIME_RANGE", "single")]

  • constraint_mode (str) – "hard" (all constraints required) or "soft" (at least one constraint required)

Returns:

  • Single context object if only one of that type

  • List of context objects if multiple of that type

Return type:

Dict mapping context_type to

Raises:

ValueError – If contexts not found, constraints not met, or cardinality violated (capability converts to specific error)

Example:

# Simple extraction without constraints
contexts = context_manager.extract_from_step(step, state)
pv_context = contexts["PV_ADDRESSES"]

# With cardinality constraints (eliminates isinstance checks)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=[
            ("PV_ADDRESSES", "single"),  # Must be exactly one
            ("TIME_RANGE", "single")     # Must be exactly one
        ],
        constraint_mode="hard"
    )
    pv_context = contexts["PV_ADDRESSES"]  # Guaranteed to be single object
    time_context = contexts["TIME_RANGE"]  # Guaranteed to be single object
except ValueError as e:
    raise ArchiverDependencyError(str(e))

# Mixed constraints (some with cardinality, some without)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=[
            ("CHANNEL_ADDRESSES", "single"),  # Must be single
            "ARCHIVER_DATA"                   # Any cardinality ok
        ]
    )
except ValueError as e:
    raise DataValidationError(str(e))

# Soft constraints (at least one required, no cardinality restriction)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=["PV_VALUES", "ARCHIVER_DATA"],
        constraint_mode="soft"
    )
except ValueError as e:
    raise DataValidationError(str(e))
get_context_access_description(context_filter=None)[source]#

Create detailed description of available context data for use in prompts.

Parameters:

context_filter (list[dict[str, str]] | None) – Optional list of context filter dictionaries

Returns:

Formatted string description of available context data

Return type:

str

get_summaries(step=None)[source]#

Get summaries of contexts for human display/LLM consumption.

Parameters:

step (dict[str, Any] | None) – Optional step dict. If provided, extract contexts from step.inputs. If None, get summaries for all available contexts.

Returns:

List of context summary dicts. Each context class defines its own summary structure via the get_summary() method.

Return type:

list[dict[str, Any]]

Example

[

{“type”: “PV Addresses”, “total_pvs”: 5, “pv_list”: […]}, {“type”: “Current Weather”, “location”: “San Francisco”, “temp”: 15.0, “conditions”: “Sunny”}, {“type”: “Current Weather”, “location”: “New York”, “temp”: 20.0, “conditions”: “Cloudy”}

]

Data Access and Persistence:

get_raw_data()[source]#

Get the raw dictionary data for state updates.

Returns:

The raw dictionary data for LangGraph state updates

Return type:

dict[str, dict[str, dict[str, Any]]]

save_context_to_file(folder_path, filename='context.json')[source]#

Save capability context data to a JSON file in the specified folder.

This method always saves the current context data to ensure it reflects the latest state. It uses the same serialization format as the state system.

Parameters:
  • folder_path (Path) – Path to the folder where the context file should be saved

  • filename (str) – Name of the context file (default: “context.json”)

Returns:

Path to the saved context file

Raises:
  • OSError – If file cannot be written

  • TypeError – If context data cannot be serialized

  • ValueError – If filename is empty or contains invalid characters

Return type:

Path

__init__(state)[source]#

Initialize ContextManager with agent state.

Parameters:

state (AgentState) – Full AgentState containing capability_context_data

Raises:
  • TypeError – If state is not an AgentState dictionary

  • ValueError – If state doesn’t contain capability_context_data key

__getattr__(context_type)[source]#

Enable dot notation access to context data with lazy namespace creation.

set_context(context_type, key, value, skip_validation=False)[source]#

Store context using Pydantic’s built-in serialization.

Parameters:
  • context_type (str) – Type of context (e.g., “PV_ADDRESSES”)

  • key (str) – Unique key for this context instance

  • value (CapabilityContext) – CapabilityContext object to store

  • skip_validation (bool) – Skip registry validation (useful for testing)

get_context(context_type, key)[source]#

Retrieve using Pydantic’s .model_validate() for reconstruction.

Parameters:
  • context_type (str) – Type of context to retrieve

  • key (str) – Key of the context instance

Returns:

Reconstructed CapabilityContext object or None if not found

Return type:

CapabilityContext | None

get_all_of_type(context_type)[source]#

Get all contexts of a specific type as reconstructed objects.

Parameters:

context_type (str) – Type of context to retrieve

Returns:

Dictionary of key -> CapabilityContext objects

Return type:

dict[str, CapabilityContext]

get_all()[source]#

Get all context data in flattened format for reporting/summary purposes.

Returns:

Dictionary with flattened keys in format “context_type.key” -> context object

Return type:

dict[str, Any]

get_context_access_description(context_filter=None)[source]#

Create detailed description of available context data for use in prompts.

Parameters:

context_filter (list[dict[str, str]] | None) – Optional list of context filter dictionaries

Returns:

Formatted string description of available context data

Return type:

str

get_summaries(step=None)[source]#

Get summaries of contexts for human display/LLM consumption.

Parameters:

step (dict[str, Any] | None) – Optional step dict. If provided, extract contexts from step.inputs. If None, get summaries for all available contexts.

Returns:

List of context summary dicts. Each context class defines its own summary structure via the get_summary() method.

Return type:

list[dict[str, Any]]

Example

[

{“type”: “PV Addresses”, “total_pvs”: 5, “pv_list”: […]}, {“type”: “Current Weather”, “location”: “San Francisco”, “temp”: 15.0, “conditions”: “Sunny”}, {“type”: “Current Weather”, “location”: “New York”, “temp”: 20.0, “conditions”: “Cloudy”}

]

get_raw_data()[source]#

Get the raw dictionary data for state updates.

Returns:

The raw dictionary data for LangGraph state updates

Return type:

dict[str, dict[str, dict[str, Any]]]

save_context_to_file(folder_path, filename='context.json')[source]#

Save capability context data to a JSON file in the specified folder.

This method always saves the current context data to ensure it reflects the latest state. It uses the same serialization format as the state system.

Parameters:
  • folder_path (Path) – Path to the folder where the context file should be saved

  • filename (str) – Name of the context file (default: “context.json”)

Returns:

Path to the saved context file

Raises:
  • OSError – If file cannot be written

  • TypeError – If context data cannot be serialized

  • ValueError – If filename is empty or contains invalid characters

Return type:

Path

extract_from_step(step, state, constraints=None, constraint_mode='hard')[source]#

Extract all contexts specified in step.inputs with optional type and cardinality constraints.

This method consolidates the common pattern of extracting context data from step inputs and validating against expected types and instance counts. It replaces repetitive validation logic across capabilities.

Parameters:
  • step (dict[str, Any]) – Execution step with inputs list like [{"PV_ADDRESSES": "key1"}, {"TIME_RANGE": "key2"}]

  • state (dict[str, Any]) – Current agent state (for error checking)

  • constraints (list[str | tuple[str, str]] | None) –

    Optional list of required context types. Each item can be: - String: context type with no cardinality restriction (e.g., "PV_ADDRESSES") - Tuple: (context_type, cardinality) where cardinality is:

    • "single": Must be exactly one instance (not a list)

    • "multiple": Must be multiple instances (must be a list)

    Example: ["PV_ADDRESSES", ("TIME_RANGE", "single")]

  • constraint_mode (str) – "hard" (all constraints required) or "soft" (at least one constraint required)

Returns:

  • Single context object if only one of that type

  • List of context objects if multiple of that type

Return type:

Dict mapping context_type to

Raises:

ValueError – If contexts not found, constraints not met, or cardinality violated (capability converts to specific error)

Example:

# Simple extraction without constraints
contexts = context_manager.extract_from_step(step, state)
pv_context = contexts["PV_ADDRESSES"]

# With cardinality constraints (eliminates isinstance checks)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=[
            ("PV_ADDRESSES", "single"),  # Must be exactly one
            ("TIME_RANGE", "single")     # Must be exactly one
        ],
        constraint_mode="hard"
    )
    pv_context = contexts["PV_ADDRESSES"]  # Guaranteed to be single object
    time_context = contexts["TIME_RANGE"]  # Guaranteed to be single object
except ValueError as e:
    raise ArchiverDependencyError(str(e))

# Mixed constraints (some with cardinality, some without)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=[
            ("CHANNEL_ADDRESSES", "single"),  # Must be single
            "ARCHIVER_DATA"                   # Any cardinality ok
        ]
    )
except ValueError as e:
    raise DataValidationError(str(e))

# Soft constraints (at least one required, no cardinality restriction)
try:
    contexts = context_manager.extract_from_step(
        step, state,
        constraints=["PV_VALUES", "ARCHIVER_DATA"],
        constraint_mode="soft"
    )
except ValueError as e:
    raise DataValidationError(str(e))

CapabilityContext#

class osprey.context.CapabilityContext[source]#

Bases: BaseModel

Base class for all capability context objects. Uses Pydantic for automatic serialization/deserialization and type validation.

This class provides: - Automatic JSON serialization via .model_dump() - Automatic deserialization via .model_validate() - Type validation on field assignment - Consistent interface for all context types

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Abstract Methods

abstractmethod get_access_details(key)[source]#

Get detailed access information for this context data.

Parameters:

key (str) – The context key this data is stored under

Returns:

Dictionary with access details including summary, capabilities, etc.

Return type:

dict

get_summary()[source]#

Get a summary of this context data for human display/LLM consumption.

Returns:

Dictionary with summary information about the context. The content and structure are completely up to the context class implementation.

Return type:

dict

Note

This method is used by get_summaries() to create human-readable displays and LLM prompts. Include whatever information is most useful for your specific context type.

Properties

property context_type: str#

Return the context type identifier

Class Constants

CONTEXT_TYPE: ClassVar[str]#

Context type identifier constant (must be overridden in subclasses).

CONTEXT_CATEGORY: ClassVar[str]#

Context category identifier constant (must be overridden in subclasses).

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'json_encoders': {<class 'datetime.datetime'>: <function CapabilityContext.<lambda>>}, 'populate_by_name': True, 'use_enum_values': True, 'validate_by_alias': True, 'validate_by_name': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

CONTEXT_TYPE: ClassVar[str] = ''#
CONTEXT_CATEGORY: ClassVar[str] = ''#
property context_type: str#

Return the context type identifier

abstractmethod get_access_details(key)[source]#

Get detailed access information for this context data.

Parameters:

key (str) – The context key this data is stored under

Returns:

Dictionary with access details including summary, capabilities, etc.

Return type:

dict

get_summary()[source]#

Get a summary of this context data for human display/LLM consumption.

Returns:

Dictionary with summary information about the context. The content and structure are completely up to the context class implementation.

Return type:

dict

Note

This method is used by get_summaries() to create human-readable displays and LLM prompts. Include whatever information is most useful for your specific context type.

Unified Data Structure#

Both state and context systems use a three-level dictionary structure optimized for LangGraph serialization and efficient data management:

{
    context_type: {
        context_key: {
            field: value,
            ...
        }
    }
}

Structure Components:

  • context_type: Defined by the CapabilityContext class’s CONTEXT_TYPE constant (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)

  • context_key: Assigned by the orchestrator node during execution planning (e.g., “step_1”, “beam_analysis_20240115”)

  • field/value pairs: User-defined data fields specific to the CapabilityContext subclass implementation

This structure enables efficient context storage, retrieval, and merging while maintaining compatibility with LangGraph’s checkpointing system. The orchestrator coordinates context keys across execution steps, while capabilities define the context types and field structures.

Context Utilities#

Context Loading#

osprey.context.load_context(context_file='context.json')[source]#

Load agent execution context from a JSON file in the current directory.

This function provides the same interface as the old load_context function but uses the new Pydantic-based ContextManager system. It maintains exact compatibility with existing access patterns.

Parameters:

context_file (str) – Name of the context file (default: “context.json”)

Returns:

ContextManager instance with dot notation access, or None if loading fails

Return type:

ContextManager | None

Usage:
>>> from osprey.context import load_context
>>> context = load_context()
>>> data = context.ARCHIVER_DATA.beam_current_historical_data
>>> pv_values = context.PV_ADDRESSES.step_1.pv_values

Namespace Access#

class osprey.context.ContextNamespace(context_manager, context_type)[source]#

Bases: object

Namespace object that provides dot notation access to context objects.

Provides dot notation access to context objects within a specific context type.

__init__(context_manager, context_type)[source]#
__getattr__(key)[source]#

Get context object by key with lazy reconstruction.

__setattr__(key, value)[source]#

Set context object by key.

State Utilities#

Custom Reducer Functions#

osprey.state.merge_capability_context_data(existing, new)[source]#

Merge capability context data dictionaries for LangGraph-native checkpointing.

This custom reducer function enables capability context data to accumulate across conversation turns while maintaining pure dictionary structure for optimal LangGraph serialization and checkpointing performance. The function implements deep merging logic that preserves existing context while integrating new context data.

The merge operation follows a three-level dictionary structure that organizes context data hierarchically by type, key, and field values. This structure provides efficient access patterns for context retrieval while maintaining serialization compatibility.

Parameters:
  • existing (Optional[Dict[str, Dict[str, Dict[str, Any]]]]) – Existing capability context data dictionary from previous state

  • new (Dict[str, Dict[str, Dict[str, Any]]]) – New capability context data to merge into existing structure

Returns:

Merged dictionary maintaining all existing data with new updates applied

Return type:

Dict[str, Dict[str, Dict[str, Any]]]

Note

The function performs deep copying to prevent mutation of input dictionaries, ensuring state immutability and preventing side effects in LangGraph operations.

Warning

New context data will override existing context data for the same context_type and context_key combination. This is intentional for capability result updates but should be considered when designing context key strategies.

Examples

Basic context merging:

>>> existing = {"PV_ADDRESSES": {"step1": {"pvs": ["SR:C01:MAG:1"]}}}
>>> new = {"PV_ADDRESSES": {"step2": {"pvs": ["SR:C02:MAG:1"]}}}
>>> result = merge_capability_context_data(existing, new)
>>> len(result["PV_ADDRESSES"])
2

Context override behavior:

>>> existing = {"DATA": {"key1": {"value": "old"}}}
>>> new = {"DATA": {"key1": {"value": "new", "extra": "data"}}}
>>> result = merge_capability_context_data(existing, new)
>>> result["DATA"]["key1"]["value"]
'new'

See also

AgentState : Main state class using this reducer osprey.context.ContextManager : Context data management utilities

Event Creation Utilities#

osprey.state.create_status_update(message, progress, complete=False, **metadata)[source]#

Create a status update event for LangGraph state integration.

This utility function creates properly formatted status update events that can be merged into agent state using LangGraph’s native state update mechanisms. The function generates timestamped status events with progress tracking and metadata support for comprehensive execution monitoring.

Status updates are used throughout the framework to provide real-time feedback on execution progress, capability status, and system operations. The events are automatically formatted for consumption by UI components and logging systems.

Parameters:
  • message (str) – Descriptive message about the current status or operation

  • progress (float) – Progress value between 0.0 and 1.0 indicating completion percentage

  • complete (bool) – Whether this status update indicates completion of the operation

  • metadata (Any) – Additional metadata fields to include in the status event

Returns:

Dictionary containing status_updates list for LangGraph state merging

Return type:

Dict[str, Any]

Note

The returned dictionary contains a ‘status_updates’ key with a list containing the new status event. This format is compatible with LangGraph’s automatic list merging for state updates.

Examples

Basic status update:

>>> update = create_status_update("Processing data", 0.5)
>>> update['status_updates'][0]['message']
'Processing data'
>>> update['status_updates'][0]['progress']
0.5

Completion status with metadata:

>>> update = create_status_update(
...     "Analysis complete", 1.0, complete=True,
...     node="data_analysis", items_processed=150
... )
>>> update['status_updates'][0]['complete']
True
>>> update['status_updates'][0]['items_processed']
150

See also

create_progress_event() : Create progress tracking events AgentState : Main state class containing status_updates field

osprey.state.create_progress_event(current, total, operation, **metadata)[source]#

Create a progress tracking event for LangGraph state integration.

This utility function creates properly formatted progress events that track incremental progress through multi-step operations. The function automatically calculates progress percentages and provides timestamped events for detailed execution monitoring and user feedback.

Progress events are particularly useful for long-running operations, batch processing, and multi-step workflows where users need visibility into execution progress and current operation status.

Parameters:
  • current (int) – Current step or item number being processed (1-based)

  • total (int) – Total number of steps or items to be processed

  • operation (str) – Descriptive name of the operation being performed

  • metadata (Any) – Additional metadata fields to include in the progress event

Returns:

Dictionary containing progress_events list for LangGraph state merging

Return type:

Dict[str, Any]

Note

Progress is automatically calculated as current/total, with safe handling for division by zero. The returned dictionary format is compatible with LangGraph’s automatic list merging for state updates.

Warning

The current parameter should be 1-based (first item is 1, not 0) for intuitive progress reporting. The progress calculation handles this appropriately.

Examples

Basic progress tracking:

>>> event = create_progress_event(3, 10, "Processing files")
>>> event['progress_events'][0]['current']
3
>>> event['progress_events'][0]['progress']
0.3

Progress with metadata:

>>> event = create_progress_event(
...     5, 20, "Analyzing data points",
...     file_name="data.csv", bytes_processed=1024
... )
>>> event['progress_events'][0]['operation']
'Analyzing data points'
>>> event['progress_events'][0]['file_name']
'data.csv'

Handling edge cases:

>>> event = create_progress_event(0, 0, "Empty operation")
>>> event['progress_events'][0]['progress']
0.0

See also

create_status_update() : Create status update events AgentState : Main state class containing progress_events field

Execution Tracking#

osprey.state.get_execution_steps_summary(state)[source]#

Generate ordered execution steps summary for prompts and UI display.

This utility function extracts and formats execution step information from the agent state to provide a clean summary of completed execution steps. The function is designed for use in capability prompts, error summaries, UI displays, and debugging contexts where execution history is needed.

The function processes the execution_step_results dictionary to create an ordered list of step descriptions based on execution order. Each step is formatted with a step number and task objective for clear presentation.

Processing Logic:

  1. Results Extraction: Retrieves execution_step_results from state

  2. Ordering: Sorts results by step_index to maintain execution order

  3. Formatting: Creates numbered step descriptions with task objectives

  4. Fallback Handling: Uses capability names when task objectives unavailable

Parameters:

state (AgentState) – Current agent state containing execution_step_results

Returns:

Ordered list of formatted step descriptions for display

Return type:

List[str]

Note

The function handles missing or incomplete execution results gracefully, returning an empty list when no execution data is available. Step numbering starts from 1 for intuitive display.

Warning

The function relies on step_index values in execution results for ordering. If step_index is missing or incorrect, the order may not reflect actual execution sequence.

Examples

Basic execution summary:

>>> state = {
...     "execution_step_results": {
...         "step1": {"step_index": 0, "task_objective": "Find PV addresses"},
...         "step2": {"step_index": 1, "task_objective": "Retrieve values"}
...     }
... }
>>> steps = get_execution_steps_summary(state)
>>> steps[0]
'Step 1: Find PV addresses'
>>> steps[1]
'Step 2: Retrieve values'

Empty state handling:

>>> empty_state = {"execution_step_results": {}}
>>> get_execution_steps_summary(empty_state)
[]

Fallback to capability names:

>>> state_without_objectives = {
...     "execution_step_results": {
...         "step1": {"step_index": 0, "capability": "pv_finder"}
...     }
... }
>>> steps = get_execution_steps_summary(state_without_objectives)
>>> steps[0]
'Step 1: pv_finder'

See also

AgentState : State structure containing execution_step_results osprey.base.decorators : Capability execution tracking osprey.infrastructure : Infrastructure components using summaries

Type Aliases#

osprey.state.StateUpdate: Dict[str, Any]#

Type alias for LangGraph state update dictionaries returned by capabilities and infrastructure nodes.

See also

osprey.base.planning.ExecutionPlan

Execution planning structures used in state

osprey.registry.RegistryManager

Component registry that manages context classes

Registry System

Registry system for component management

Context Management System

Complete guide to context management patterns

State Management Architecture

Complete guide to state management patterns