State and Context Management#
LangGraph-native state management with context persistence and sophisticated data management using Pydantic for automatic serialization, validation, and type safety.
Core State System#
The state system provides a clean separation between persistent context data and execution-scoped fields that reset automatically between conversation turns, built on LangGraph’s MessagesState foundation.
AgentState#
- class osprey.state.AgentState(*args, **kwargs)[source]#
Bases:
MessagesStateLangGraph-native conversational agent state with comprehensive execution tracking.
This TypedDict class extends LangGraph’s MessagesState to provide a complete state structure for conversational AI agents. The state design implements selective persistence where only capability context data accumulates across conversation turns, while all execution-related fields reset automatically between graph invocations for optimal performance and state clarity.
State Architecture:
The state follows a flat structure with logical field prefixes for organization:
Persistent Fields: Only capability_context_data persists across conversations
Execution-Scoped Fields: All other fields reset automatically each invocation
No Custom Reducers: Leverages LangGraph’s native message handling patterns
Type Safety: Comprehensive TypedDict definitions with proper type annotations
Field Organization by Prefix:
task_* : Task extraction and input processing state
planning_* : Classification and orchestration management
execution_* : Step execution results and tracking
control_* : Control flow and retry logic state
ui_* : UI-specific results and command generation
runtime_* : Runtime metadata and checkpoint information
Context Data Structure:
The capability_context_data field uses a three-level dictionary optimized for LangGraph serialization and efficient context retrieval:
{ context_type: { context_key: { field: value, ... } } }
State Lifecycle:
Fresh State Creation: StateManager.create_fresh_state() initializes defaults
Context Preservation: Only capability_context_data carries forward
Execution Reset: All execution fields reset to defaults each turn
Message Handling: LangGraph manages message history automatically
Note
TypedDict classes cannot define default values in the class definition. Always use StateManager.create_fresh_state() to create properly initialized state instances. Only capability_context_data will be preserved from previous state if available.
Warning
Direct manipulation of state fields may interfere with LangGraph’s automatic checkpointing and serialization. Use StateManager utilities for all state operations to ensure compatibility and proper behavior.
Examples
Creating fresh state through StateManager:
>>> from osprey.state import StateManager >>> state = StateManager.create_fresh_state( ... user_input="Find beam current PV addresses", ... current_state=previous_state ... ) >>> state['task_current_task'] None
Accessing persistent context data:
>>> context_data = state['capability_context_data'] >>> pv_addresses = context_data.get('PV_ADDRESSES', {}) >>> len(pv_addresses) 3
See also
osprey.state.StateManager: State creation and management utilitiesosprey.context.ContextManager: Context data access and storagemerge_capability_context_data(): Custom reducer for context persistenceosprey.base.planning.ExecutionPlan: Execution planning structuresKey State Fields
Persistent Field:
- Return type:
Any
- capability_context_data: Dict[str, Dict[str, Dict[str, Any]]]
Only field that persists across conversation turns. Uses three-level dictionary structure: {context_type: {context_key: {field: value}}}. Uses custom reducer
merge_capability_context_data().
Execution-Scoped Fields (reset each invocation):
- task_current_task: str | None#
Current task being processed.
- planning_execution_plan: ExecutionPlan | None#
Current execution plan from orchestrator.
- execution_step_results: Dict[str, Any]#
Results from completed execution steps.
- control_has_error: bool#
Error state for manual retry handling.
- capability_context_data: Any]]], <function merge_capability_context_data at 0x7fc58ad83ba0>]#
- agent_control: dict[str, Any]#
- status_updates: list[dict[str, Any]]#
- progress_events: list[dict[str, Any]]#
- task_current_task: str | None#
- task_depends_on_chat_history: bool#
- task_depends_on_user_memory: bool#
- task_custom_message: str | None#
- planning_active_capabilities: list[str]#
- planning_execution_plan: ExecutionPlan | None#
- planning_current_step_index: int#
- execution_step_results: dict[str, Any]#
- execution_last_result: ExecutionResult | None#
- execution_pending_approvals: dict[str, ApprovalRequest]#
- execution_start_time: float | None#
- execution_total_time: float | None#
- approval_approved: bool | None#
- approved_payload: dict[str, Any] | None#
- control_reclassification_reason: str | None#
- control_reclassification_count: int#
- control_plans_created_count: int#
- control_current_step_retry_count: int#
- control_retry_count: int#
- control_has_error: bool#
- control_error_info: dict[str, Any] | None#
- control_last_error: dict[str, Any] | None#
- control_max_retries: int#
- control_is_killed: bool#
- control_kill_reason: str | None#
- control_is_awaiting_validation: bool#
- control_validation_context: dict[str, Any] | None#
- control_validation_timestamp: float | None#
- ui_captured_notebooks: list[dict[str, Any]]#
- ui_captured_figures: list[dict[str, Any]]#
- ui_launchable_commands: list[dict[str, Any]]#
- ui_agent_context: dict[str, Any] | None#
- runtime_checkpoint_metadata: dict[str, Any] | None#
- runtime_info: dict[str, Any] | None#
StateManager#
- class osprey.state.StateManager[source]#
Bases:
objectLangGraph-native state management utilities for conversational AI agents.
This class provides a comprehensive suite of static utilities for managing conversational agent state using LangGraph’s native patterns. The StateManager serves as the primary interface for state creation, manipulation, and context storage operations throughout the Osprey Agent Framework.
Core Functionality:
The StateManager implements a simplified state management approach that leverages LangGraph’s native patterns for optimal performance and compatibility:
Fresh State Creation: Initialize new conversation states with proper defaults
Context Persistence: Maintain capability context data across conversation turns
State Updates: Generate LangGraph-compatible state update dictionaries
Context Storage: One-liner context storage for capability results
Execution Tracking: Utilities for accessing execution plans and current steps
Design Philosophy:
The StateManager follows a “batteries included” approach where common state operations are provided as simple, one-liner utilities. This eliminates the need for capabilities and infrastructure components to implement complex state logic.
State Lifecycle Management:
Initialization: create_fresh_state() creates new conversation states
Execution: Utilities provide access to current execution context
Context Storage: store_context() handles capability result persistence
State Updates: Methods generate proper LangGraph state update dictionaries
Note
All StateManager methods are static utilities that do not require instantiation. The class serves as a namespace for state management operations.
Warning
StateManager utilities are optimized for LangGraph’s native patterns. Using these utilities ensures compatibility with checkpointing and serialization. Direct state manipulation may cause issues.
Examples
Creating fresh state for new conversation:
>>> state = StateManager.create_fresh_state( ... user_input="Find beam current data", ... current_state=previous_state ... )
Storing capability context results:
>>> updates = StateManager.store_context( ... state, "PV_ADDRESSES", "step1", pv_data ... ) >>> return updates # Compatible with LangGraph
Accessing current execution step:
>>> step = StateManager.get_current_step(state) >>> task_objective = step.get('task_objective')
See also
osprey.state.AgentState: Main state structure managed by this classosprey.context.ContextManager: Context data management utilitiesget_agent_control_defaults(): Configuration defaults for state creationKey Methods
- static create_fresh_state(user_input, current_state=None)[source]#
Create fresh agent state for a new conversation turn with selective persistence.
This method creates a complete fresh state for LangGraph execution while preserving only the capability context data from the previous state. All execution-scoped fields are reset to their default values, ensuring clean state for each conversation turn while maintaining accumulated context.
The state creation process handles the complete lifecycle initialization:
Message Initialization: Creates properly formatted user message
Context Preservation: Extracts and preserves capability context data
Default Population: Initializes all execution fields with safe defaults
Configuration Loading: Applies agent control defaults from configuration
This approach ensures optimal performance by avoiding state bloat while maintaining the persistent context needed for multi-turn conversations.
- Parameters:
user_input (str) – The user’s message content for this conversation turn
current_state (Optional[AgentState]) – Previous agent state to preserve context from (optional)
- Returns:
Complete fresh state ready for LangGraph graph invocation
- Return type:
Note
Only capability_context_data persists across conversation turns. All other fields including execution results, control state, and UI data are reset to defaults for optimal performance and state clarity.
Warning
The current_state parameter should be the complete previous AgentState. Partial state dictionaries may cause context preservation to fail silently.
Examples
Creating state for new conversation:
>>> state = StateManager.create_fresh_state("Find beam current PVs") >>> state['task_current_task'] is None True >>> len(state['messages']) 1
Preserving context from previous conversation:
>>> new_state = StateManager.create_fresh_state( ... "Show me the latest data", ... current_state=previous_state ... ) >>> # Context preserved but execution state reset >>> len(new_state['capability_context_data']) 3 # Preserved from previous >>> new_state['execution_step_results'] {} # Reset to empty
See also
AgentState: State structure created by this methodget_agent_control_defaults(): Configuration defaults appliedosprey.state.MessageUtils: Message creation utilities
- static store_context(state, context_type, context_key, context_object)[source]#
Store capability context data and return LangGraph-compatible state updates.
This is the primary utility function that capabilities should use for storing context data in the agent state. The method provides a one-liner interface that handles all the complexity of context management, serialization, and state update generation for seamless integration with LangGraph’s state system.
The function performs the complete context storage workflow:
Context Manager Creation: Initializes ContextManager from current state
Context Serialization: Converts CapabilityContext to dictionary format
State Integration: Merges context data into existing state structure
Update Generation: Returns LangGraph-compatible state update dictionary
This approach ensures that capability results are properly persisted across conversation turns while maintaining optimal serialization performance.
- Parameters:
state (AgentState) – Current agent state containing existing context data
context_type (str) – Context type identifier (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)
context_key (str) – Unique key for this context instance within the type
context_object (CapabilityContext) – CapabilityContext object containing data to store
- Returns:
State update dictionary for LangGraph automatic merging
- Return type:
StateUpdate
Note
This is the recommended pattern for all capability context storage. The returned dictionary can be directly returned from capability execute methods and will be automatically merged by LangGraph.
Warning
The context_key should be unique within the context_type to avoid overwriting existing context data. Consider using step context keys or timestamp-based keys for uniqueness.
Examples
Basic capability context storage:
>>> @staticmethod >>> async def execute(state: AgentState, **kwargs): ... # Capability logic here ... result = PVAddresses(pvs=found_pvs, description="Found PVs") ... ... # Store and return state updates (one-liner) ... step = StateManager.get_current_step(state) ... return StateManager.store_context( ... state, "PV_ADDRESSES", step.get('context_key'), result ... )
Context storage with custom key:
>>> analysis_result = DataAnalysis( ... results=processed_data, ... timestamp=datetime.now() ... ) >>> updates = StateManager.store_context( ... state, "ANALYSIS", f"beam_analysis_{timestamp}", analysis_result ... ) >>> return updates
See also
osprey.context.ContextManager: Underlying context managementosprey.context.CapabilityContext: Base class for context objectsget_current_step(): Utility for getting current execution step
- static get_current_step(state)[source]#
Get current execution step from state.
This is the unified utility for accessing the current step being executed. Replaces the old pattern of importing _extract_current_step from decorators.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
Current step dictionary with capability, task_objective, etc.
- Return type:
- Raises:
RuntimeError – If execution plan is missing or step index is invalid
Example
# In any capability or infrastructure component: from osprey.state import StateManager step = StateManager.get_current_step(state) task_objective = step.get('task_objective') capability = step.get('capability')
- static get_execution_plan(state)[source]#
Get current execution plan from state with type validation.
Performs type validation to ensure the returned ExecutionPlan is properly formed (TypedDict with ‘get’ method). This eliminates the need for downstream callers to perform hasattr checks.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
ExecutionPlan if available and valid, None otherwise
- Return type:
ExecutionPlan | None
- static register_figure(state, capability, figure_path, display_name=None, metadata=None, current_figures=None)[source]#
Register a figure in the centralized UI registry.
This is the single point of entry for all capabilities to register figures for UI display. Provides a capability-agnostic interface that works for Python, R, Julia, or any other figure-generating capability.
- Parameters:
state (AgentState) – Current agent state
capability (str) – Capability identifier (e.g., “python_executor”, “r_executor”)
figure_path (str) – Path to the figure file (absolute or relative)
display_name (str | None) – Optional human-readable figure name
metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary
current_figures (list[dict[str, Any]] | None) – Optional list of current figures to accumulate (otherwise get from state)
- Returns:
State update dictionary with ui_captured_figures update
- Return type:
dict[str, Any]
Examples
Basic figure registration:
>>> figure_update = StateManager.register_figure( ... state, "python_executor", "/path/to/plot.png" ... ) >>> return {**other_updates, **figure_update}
Rich figure registration:
>>> figure_update = StateManager.register_figure( ... state, ... capability="python_executor", ... figure_path="figures/analysis.png", ... display_name="Performance Analysis", ... metadata={ ... "execution_folder": "/path/to/execution", ... "notebook_link": "http://jupyter/notebook.ipynb" ... "figure_type": "matplotlib_png" ... } ... )
- static register_notebook(state, capability, notebook_path, notebook_link, display_name=None, metadata=None)[source]#
Register a notebook in the centralized UI registry.
This method provides notebook registration functionality that was being called but didn’t exist. It follows the same pattern as figure registration.
- Parameters:
state (AgentState) – Current agent state
capability (str) – Capability identifier (e.g., “python_executor”, “data_analysis”)
notebook_path (str) – Path to the notebook file
notebook_link (str) – Link to access the notebook
display_name (str | None) – Optional human-readable notebook name
metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary
- Returns:
State update dictionary with ui_captured_notebooks update
- Return type:
dict[str, Any]
- static create_fresh_state(user_input, current_state=None)[source]#
Create fresh agent state for a new conversation turn with selective persistence.
This method creates a complete fresh state for LangGraph execution while preserving only the capability context data from the previous state. All execution-scoped fields are reset to their default values, ensuring clean state for each conversation turn while maintaining accumulated context.
The state creation process handles the complete lifecycle initialization:
Message Initialization: Creates properly formatted user message
Context Preservation: Extracts and preserves capability context data
Default Population: Initializes all execution fields with safe defaults
Configuration Loading: Applies agent control defaults from configuration
This approach ensures optimal performance by avoiding state bloat while maintaining the persistent context needed for multi-turn conversations.
- Parameters:
user_input (str) – The user’s message content for this conversation turn
current_state (Optional[AgentState]) – Previous agent state to preserve context from (optional)
- Returns:
Complete fresh state ready for LangGraph graph invocation
- Return type:
Note
Only capability_context_data persists across conversation turns. All other fields including execution results, control state, and UI data are reset to defaults for optimal performance and state clarity.
Warning
The current_state parameter should be the complete previous AgentState. Partial state dictionaries may cause context preservation to fail silently.
Examples
Creating state for new conversation:
>>> state = StateManager.create_fresh_state("Find beam current PVs") >>> state['task_current_task'] is None True >>> len(state['messages']) 1
Preserving context from previous conversation:
>>> new_state = StateManager.create_fresh_state( ... "Show me the latest data", ... current_state=previous_state ... ) >>> # Context preserved but execution state reset >>> len(new_state['capability_context_data']) 3 # Preserved from previous >>> new_state['execution_step_results'] {} # Reset to empty
See also
AgentState: State structure created by this methodget_agent_control_defaults(): Configuration defaults appliedosprey.state.MessageUtils: Message creation utilities
- static get_user_query(state)[source]#
Get the user’s query from the current conversation.
Extracts the most recent user message from the conversation history, which represents the original user query that started the current conversation turn.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
The user’s query string, or None if no user messages exist
- Return type:
str | None
- static store_context(state, context_type, context_key, context_object)[source]#
Store capability context data and return LangGraph-compatible state updates.
This is the primary utility function that capabilities should use for storing context data in the agent state. The method provides a one-liner interface that handles all the complexity of context management, serialization, and state update generation for seamless integration with LangGraph’s state system.
The function performs the complete context storage workflow:
Context Manager Creation: Initializes ContextManager from current state
Context Serialization: Converts CapabilityContext to dictionary format
State Integration: Merges context data into existing state structure
Update Generation: Returns LangGraph-compatible state update dictionary
This approach ensures that capability results are properly persisted across conversation turns while maintaining optimal serialization performance.
- Parameters:
state (AgentState) – Current agent state containing existing context data
context_type (str) – Context type identifier (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)
context_key (str) – Unique key for this context instance within the type
context_object (CapabilityContext) – CapabilityContext object containing data to store
- Returns:
State update dictionary for LangGraph automatic merging
- Return type:
StateUpdate
Note
This is the recommended pattern for all capability context storage. The returned dictionary can be directly returned from capability execute methods and will be automatically merged by LangGraph.
Warning
The context_key should be unique within the context_type to avoid overwriting existing context data. Consider using step context keys or timestamp-based keys for uniqueness.
Examples
Basic capability context storage:
>>> @staticmethod >>> async def execute(state: AgentState, **kwargs): ... # Capability logic here ... result = PVAddresses(pvs=found_pvs, description="Found PVs") ... ... # Store and return state updates (one-liner) ... step = StateManager.get_current_step(state) ... return StateManager.store_context( ... state, "PV_ADDRESSES", step.get('context_key'), result ... )
Context storage with custom key:
>>> analysis_result = DataAnalysis( ... results=processed_data, ... timestamp=datetime.now() ... ) >>> updates = StateManager.store_context( ... state, "ANALYSIS", f"beam_analysis_{timestamp}", analysis_result ... ) >>> return updates
See also
osprey.context.ContextManager: Underlying context managementosprey.context.CapabilityContext: Base class for context objectsget_current_step(): Utility for getting current execution step
- static create_response_update(response)[source]#
Create a state update that adds an assistant response using native LangGraph pattern.
- Parameters:
response (str) – The assistant’s response
- Returns:
Update that adds the response message to the conversation
- Return type:
StateUpdate
- static get_execution_plan(state)[source]#
Get current execution plan from state with type validation.
Performs type validation to ensure the returned ExecutionPlan is properly formed (TypedDict with ‘get’ method). This eliminates the need for downstream callers to perform hasattr checks.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
ExecutionPlan if available and valid, None otherwise
- Return type:
ExecutionPlan | None
- static get_current_step_index(state)[source]#
Get current step index from state.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
Current step index (defaults to 0 if not set)
- Return type:
int
- static get_current_step(state)[source]#
Get current execution step from state.
This is the unified utility for accessing the current step being executed. Replaces the old pattern of importing _extract_current_step from decorators.
- Parameters:
state (AgentState) – Current conversation state
- Returns:
Current step dictionary with capability, task_objective, etc.
- Return type:
- Raises:
RuntimeError – If execution plan is missing or step index is invalid
Example
# In any capability or infrastructure component: from osprey.state import StateManager step = StateManager.get_current_step(state) task_objective = step.get('task_objective') capability = step.get('capability')
- static register_figure(state, capability, figure_path, display_name=None, metadata=None, current_figures=None)[source]#
Register a figure in the centralized UI registry.
This is the single point of entry for all capabilities to register figures for UI display. Provides a capability-agnostic interface that works for Python, R, Julia, or any other figure-generating capability.
- Parameters:
state (AgentState) – Current agent state
capability (str) – Capability identifier (e.g., “python_executor”, “r_executor”)
figure_path (str) – Path to the figure file (absolute or relative)
display_name (str | None) – Optional human-readable figure name
metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary
current_figures (list[dict[str, Any]] | None) – Optional list of current figures to accumulate (otherwise get from state)
- Returns:
State update dictionary with ui_captured_figures update
- Return type:
dict[str, Any]
Examples
Basic figure registration:
>>> figure_update = StateManager.register_figure( ... state, "python_executor", "/path/to/plot.png" ... ) >>> return {**other_updates, **figure_update}
Rich figure registration:
>>> figure_update = StateManager.register_figure( ... state, ... capability="python_executor", ... figure_path="figures/analysis.png", ... display_name="Performance Analysis", ... metadata={ ... "execution_folder": "/path/to/execution", ... "notebook_link": "http://jupyter/notebook.ipynb" ... "figure_type": "matplotlib_png" ... } ... )
- static register_command(state, capability, launch_uri, display_name=None, command_type=None, metadata=None, current_commands=None)[source]#
Register a launchable command in the centralized UI registry.
This method allows capabilities to register commands that users can execute through the UI. Commands are typically external applications, web interfaces, or desktop tools that can be launched via URIs.
- Parameters:
state (AgentState) – Current agent state
capability (str) – Name of the capability that generated this command
launch_uri (str) – URI that can be used to launch the command (e.g., http://, file://, custom://)
display_name (str | None) – Optional human-readable name for the command
command_type (str | None) – Optional type of command (e.g., ‘web_app’, ‘desktop_app’, ‘viewer’)
metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary
current_commands (list[dict[str, Any]] | None) – Optional list of current commands to accumulate (otherwise get from state)
- Returns:
State update dictionary with ui_launchable_commands update
- Return type:
dict[str, Any]
Examples
Basic command registration:
>>> command_update = StateManager.register_command( ... state, "file_processor", "file:///path/to/results.html" ... ) >>> return {**other_updates, **command_update}
Rich command registration:
>>> command_update = StateManager.register_command( ... state, ... capability="data_visualizer", ... launch_uri="http://localhost:8080/dashboard", ... display_name="Interactive Dashboard", ... command_type="web_app", ... metadata={ ... "port": 8080, ... "data_source": "analysis_results", ... "chart_count": 3 ... } ... )
- static register_notebook(state, capability, notebook_path, notebook_link, display_name=None, metadata=None)[source]#
Register a notebook in the centralized UI registry.
This method provides notebook registration functionality that was being called but didn’t exist. It follows the same pattern as figure registration.
- Parameters:
state (AgentState) – Current agent state
capability (str) – Capability identifier (e.g., “python_executor”, “data_analysis”)
notebook_path (str) – Path to the notebook file
notebook_link (str) – Link to access the notebook
display_name (str | None) – Optional human-readable notebook name
metadata (dict[str, Any] | None) – Optional capability-specific metadata dictionary
- Returns:
State update dictionary with ui_captured_notebooks update
- Return type:
dict[str, Any]
Context Management System#
The context management system provides sophisticated functionality over dictionary data while maintaining LangGraph-compatible storage using Pydantic’s built-in serialization capabilities.
ContextManager#
- class osprey.context.ContextManager(state)[source]#
Bases:
objectSimplified LangGraph-native context manager using Pydantic serialization.
This class provides sophisticated functionality over dictionary data while storing everything in LangGraph-compatible dictionary format. It uses Pydantic’s built-in serialization capabilities to eliminate complex custom logic.
The data is stored as: {context_type: {context_key: {field: value}}}
Initialize ContextManager with agent state.
- Parameters:
state (AgentState) – Full AgentState containing capability_context_data
- Raises:
TypeError – If state is not an AgentState dictionary
ValueError – If state doesn’t contain capability_context_data key
Key Methods
Context Storage and Retrieval:
- set_context(context_type, key, value, skip_validation=False)[source]#
Store context using Pydantic’s built-in serialization.
- Parameters:
context_type (str) – Type of context (e.g., “PV_ADDRESSES”)
key (str) – Unique key for this context instance
value (CapabilityContext) – CapabilityContext object to store
skip_validation (bool) – Skip registry validation (useful for testing)
- get_context(context_type, key)[source]#
Retrieve using Pydantic’s .model_validate() for reconstruction.
- Parameters:
context_type (str) – Type of context to retrieve
key (str) – Key of the context instance
- Returns:
Reconstructed CapabilityContext object or None if not found
- Return type:
CapabilityContext | None
- get_all_of_type(context_type)[source]#
Get all contexts of a specific type as reconstructed objects.
- Parameters:
context_type (str) – Type of context to retrieve
- Returns:
Dictionary of key -> CapabilityContext objects
- Return type:
dict[str, CapabilityContext]
- get_all()[source]#
Get all context data in flattened format for reporting/summary purposes.
- Returns:
Dictionary with flattened keys in format “context_type.key” -> context object
- Return type:
dict[str, Any]
Context Extraction and Description:
- extract_from_step(step, state, constraints=None, constraint_mode='hard')[source]
Extract all contexts specified in step.inputs with optional type and cardinality constraints.
This method consolidates the common pattern of extracting context data from step inputs and validating against expected types and instance counts. It replaces repetitive validation logic across capabilities.
- Parameters:
step (dict[str, Any]) – Execution step with inputs list like
[{"PV_ADDRESSES": "key1"}, {"TIME_RANGE": "key2"}]state (dict[str, Any]) – Current agent state (for error checking)
constraints (list[str | tuple[str, str]] | None) –
Optional list of required context types. Each item can be: - String: context type with no cardinality restriction (e.g.,
"PV_ADDRESSES") - Tuple:(context_type, cardinality)where cardinality is:"single": Must be exactly one instance (not a list)"multiple": Must be multiple instances (must be a list)
Example:
["PV_ADDRESSES", ("TIME_RANGE", "single")]constraint_mode (str) –
"hard"(all constraints required) or"soft"(at least one constraint required)
- Returns:
Single context object if only one of that type
List of context objects if multiple of that type
- Return type:
Dict mapping context_type to
- Raises:
ValueError – If contexts not found, constraints not met, or cardinality violated (capability converts to specific error)
Example:
# Simple extraction without constraints contexts = context_manager.extract_from_step(step, state) pv_context = contexts["PV_ADDRESSES"] # With cardinality constraints (eliminates isinstance checks) try: contexts = context_manager.extract_from_step( step, state, constraints=[ ("PV_ADDRESSES", "single"), # Must be exactly one ("TIME_RANGE", "single") # Must be exactly one ], constraint_mode="hard" ) pv_context = contexts["PV_ADDRESSES"] # Guaranteed to be single object time_context = contexts["TIME_RANGE"] # Guaranteed to be single object except ValueError as e: raise ArchiverDependencyError(str(e)) # Mixed constraints (some with cardinality, some without) try: contexts = context_manager.extract_from_step( step, state, constraints=[ ("CHANNEL_ADDRESSES", "single"), # Must be single "ARCHIVER_DATA" # Any cardinality ok ] ) except ValueError as e: raise DataValidationError(str(e)) # Soft constraints (at least one required, no cardinality restriction) try: contexts = context_manager.extract_from_step( step, state, constraints=["PV_VALUES", "ARCHIVER_DATA"], constraint_mode="soft" ) except ValueError as e: raise DataValidationError(str(e))
- get_context_access_description(context_filter=None)[source]#
Create detailed description of available context data for use in prompts.
- Parameters:
context_filter (list[dict[str, str]] | None) – Optional list of context filter dictionaries
- Returns:
Formatted string description of available context data
- Return type:
str
- get_summaries(step=None)[source]#
Get summaries of contexts for human display/LLM consumption.
- Parameters:
step (dict[str, Any] | None) – Optional step dict. If provided, extract contexts from step.inputs. If None, get summaries for all available contexts.
- Returns:
List of context summary dicts. Each context class defines its own summary structure via the get_summary() method.
- Return type:
list[dict[str, Any]]
Example
- [
{“type”: “PV Addresses”, “total_pvs”: 5, “pv_list”: […]}, {“type”: “Current Weather”, “location”: “San Francisco”, “temp”: 15.0, “conditions”: “Sunny”}, {“type”: “Current Weather”, “location”: “New York”, “temp”: 20.0, “conditions”: “Cloudy”}
]
Data Access and Persistence:
- get_raw_data()[source]#
Get the raw dictionary data for state updates.
- Returns:
The raw dictionary data for LangGraph state updates
- Return type:
dict[str, dict[str, dict[str, Any]]]
- save_context_to_file(folder_path, filename='context.json')[source]#
Save capability context data to a JSON file in the specified folder.
This method always saves the current context data to ensure it reflects the latest state. It uses the same serialization format as the state system.
- Parameters:
folder_path (Path) – Path to the folder where the context file should be saved
filename (str) – Name of the context file (default: “context.json”)
- Returns:
Path to the saved context file
- Raises:
OSError – If file cannot be written
TypeError – If context data cannot be serialized
ValueError – If filename is empty or contains invalid characters
- Return type:
Path
- __init__(state)[source]#
Initialize ContextManager with agent state.
- Parameters:
state (AgentState) – Full AgentState containing capability_context_data
- Raises:
TypeError – If state is not an AgentState dictionary
ValueError – If state doesn’t contain capability_context_data key
- __getattr__(context_type)[source]#
Enable dot notation access to context data with lazy namespace creation.
- set_context(context_type, key, value, skip_validation=False)[source]#
Store context using Pydantic’s built-in serialization.
- Parameters:
context_type (str) – Type of context (e.g., “PV_ADDRESSES”)
key (str) – Unique key for this context instance
value (CapabilityContext) – CapabilityContext object to store
skip_validation (bool) – Skip registry validation (useful for testing)
- get_context(context_type, key)[source]#
Retrieve using Pydantic’s .model_validate() for reconstruction.
- Parameters:
context_type (str) – Type of context to retrieve
key (str) – Key of the context instance
- Returns:
Reconstructed CapabilityContext object or None if not found
- Return type:
CapabilityContext | None
- get_all_of_type(context_type)[source]#
Get all contexts of a specific type as reconstructed objects.
- Parameters:
context_type (str) – Type of context to retrieve
- Returns:
Dictionary of key -> CapabilityContext objects
- Return type:
dict[str, CapabilityContext]
- get_all()[source]#
Get all context data in flattened format for reporting/summary purposes.
- Returns:
Dictionary with flattened keys in format “context_type.key” -> context object
- Return type:
dict[str, Any]
- get_context_access_description(context_filter=None)[source]#
Create detailed description of available context data for use in prompts.
- Parameters:
context_filter (list[dict[str, str]] | None) – Optional list of context filter dictionaries
- Returns:
Formatted string description of available context data
- Return type:
str
- get_summaries(step=None)[source]#
Get summaries of contexts for human display/LLM consumption.
- Parameters:
step (dict[str, Any] | None) – Optional step dict. If provided, extract contexts from step.inputs. If None, get summaries for all available contexts.
- Returns:
List of context summary dicts. Each context class defines its own summary structure via the get_summary() method.
- Return type:
list[dict[str, Any]]
Example
- [
{“type”: “PV Addresses”, “total_pvs”: 5, “pv_list”: […]}, {“type”: “Current Weather”, “location”: “San Francisco”, “temp”: 15.0, “conditions”: “Sunny”}, {“type”: “Current Weather”, “location”: “New York”, “temp”: 20.0, “conditions”: “Cloudy”}
]
- get_raw_data()[source]#
Get the raw dictionary data for state updates.
- Returns:
The raw dictionary data for LangGraph state updates
- Return type:
dict[str, dict[str, dict[str, Any]]]
- save_context_to_file(folder_path, filename='context.json')[source]#
Save capability context data to a JSON file in the specified folder.
This method always saves the current context data to ensure it reflects the latest state. It uses the same serialization format as the state system.
- Parameters:
folder_path (Path) – Path to the folder where the context file should be saved
filename (str) – Name of the context file (default: “context.json”)
- Returns:
Path to the saved context file
- Raises:
OSError – If file cannot be written
TypeError – If context data cannot be serialized
ValueError – If filename is empty or contains invalid characters
- Return type:
Path
- extract_from_step(step, state, constraints=None, constraint_mode='hard')[source]#
Extract all contexts specified in step.inputs with optional type and cardinality constraints.
This method consolidates the common pattern of extracting context data from step inputs and validating against expected types and instance counts. It replaces repetitive validation logic across capabilities.
- Parameters:
step (dict[str, Any]) – Execution step with inputs list like
[{"PV_ADDRESSES": "key1"}, {"TIME_RANGE": "key2"}]state (dict[str, Any]) – Current agent state (for error checking)
constraints (list[str | tuple[str, str]] | None) –
Optional list of required context types. Each item can be: - String: context type with no cardinality restriction (e.g.,
"PV_ADDRESSES") - Tuple:(context_type, cardinality)where cardinality is:"single": Must be exactly one instance (not a list)"multiple": Must be multiple instances (must be a list)
Example:
["PV_ADDRESSES", ("TIME_RANGE", "single")]constraint_mode (str) –
"hard"(all constraints required) or"soft"(at least one constraint required)
- Returns:
Single context object if only one of that type
List of context objects if multiple of that type
- Return type:
Dict mapping context_type to
- Raises:
ValueError – If contexts not found, constraints not met, or cardinality violated (capability converts to specific error)
Example:
# Simple extraction without constraints contexts = context_manager.extract_from_step(step, state) pv_context = contexts["PV_ADDRESSES"] # With cardinality constraints (eliminates isinstance checks) try: contexts = context_manager.extract_from_step( step, state, constraints=[ ("PV_ADDRESSES", "single"), # Must be exactly one ("TIME_RANGE", "single") # Must be exactly one ], constraint_mode="hard" ) pv_context = contexts["PV_ADDRESSES"] # Guaranteed to be single object time_context = contexts["TIME_RANGE"] # Guaranteed to be single object except ValueError as e: raise ArchiverDependencyError(str(e)) # Mixed constraints (some with cardinality, some without) try: contexts = context_manager.extract_from_step( step, state, constraints=[ ("CHANNEL_ADDRESSES", "single"), # Must be single "ARCHIVER_DATA" # Any cardinality ok ] ) except ValueError as e: raise DataValidationError(str(e)) # Soft constraints (at least one required, no cardinality restriction) try: contexts = context_manager.extract_from_step( step, state, constraints=["PV_VALUES", "ARCHIVER_DATA"], constraint_mode="soft" ) except ValueError as e: raise DataValidationError(str(e))
CapabilityContext#
- class osprey.context.CapabilityContext[source]#
Bases:
BaseModelBase class for all capability context objects. Uses Pydantic for automatic serialization/deserialization and type validation.
This class provides: - Automatic JSON serialization via .model_dump() - Automatic deserialization via .model_validate() - Type validation on field assignment - Consistent interface for all context types
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Abstract Methods
- abstractmethod get_access_details(key)[source]#
Get detailed access information for this context data.
- Parameters:
key (str) – The context key this data is stored under
- Returns:
Dictionary with access details including summary, capabilities, etc.
- Return type:
dict
- get_summary()[source]#
Get a summary of this context data for human display/LLM consumption.
- Returns:
Dictionary with summary information about the context. The content and structure are completely up to the context class implementation.
- Return type:
dict
Note
This method is used by get_summaries() to create human-readable displays and LLM prompts. Include whatever information is most useful for your specific context type.
Properties
- property context_type: str#
Return the context type identifier
Class Constants
- CONTEXT_TYPE: ClassVar[str]#
Context type identifier constant (must be overridden in subclasses).
- CONTEXT_CATEGORY: ClassVar[str]#
Context category identifier constant (must be overridden in subclasses).
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'json_encoders': {<class 'datetime.datetime'>: <function CapabilityContext.<lambda>>}, 'populate_by_name': True, 'use_enum_values': True, 'validate_by_alias': True, 'validate_by_name': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- CONTEXT_TYPE: ClassVar[str] = ''#
- CONTEXT_CATEGORY: ClassVar[str] = ''#
- property context_type: str#
Return the context type identifier
- abstractmethod get_access_details(key)[source]#
Get detailed access information for this context data.
- Parameters:
key (str) – The context key this data is stored under
- Returns:
Dictionary with access details including summary, capabilities, etc.
- Return type:
dict
- get_summary()[source]#
Get a summary of this context data for human display/LLM consumption.
- Returns:
Dictionary with summary information about the context. The content and structure are completely up to the context class implementation.
- Return type:
dict
Note
This method is used by get_summaries() to create human-readable displays and LLM prompts. Include whatever information is most useful for your specific context type.
Unified Data Structure#
Both state and context systems use a three-level dictionary structure optimized for LangGraph serialization and efficient data management:
{
context_type: {
context_key: {
field: value,
...
}
}
}
Structure Components:
context_type: Defined by the CapabilityContext class’s
CONTEXT_TYPEconstant (e.g., “PV_ADDRESSES”, “ANALYSIS_RESULTS”)context_key: Assigned by the orchestrator node during execution planning (e.g., “step_1”, “beam_analysis_20240115”)
field/value pairs: User-defined data fields specific to the CapabilityContext subclass implementation
This structure enables efficient context storage, retrieval, and merging while maintaining compatibility with LangGraph’s checkpointing system. The orchestrator coordinates context keys across execution steps, while capabilities define the context types and field structures.
Context Utilities#
Context Loading#
- osprey.context.load_context(context_file='context.json')[source]#
Load agent execution context from a JSON file in the current directory.
This function provides the same interface as the old load_context function but uses the new Pydantic-based ContextManager system. It maintains exact compatibility with existing access patterns.
- Parameters:
context_file (str) – Name of the context file (default: “context.json”)
- Returns:
ContextManager instance with dot notation access, or None if loading fails
- Return type:
ContextManager | None
- Usage:
>>> from osprey.context import load_context >>> context = load_context() >>> data = context.ARCHIVER_DATA.beam_current_historical_data >>> pv_values = context.PV_ADDRESSES.step_1.pv_values
Namespace Access#
State Utilities#
Custom Reducer Functions#
- osprey.state.merge_capability_context_data(existing, new)[source]#
Merge capability context data dictionaries for LangGraph-native checkpointing.
This custom reducer function enables capability context data to accumulate across conversation turns while maintaining pure dictionary structure for optimal LangGraph serialization and checkpointing performance. The function implements deep merging logic that preserves existing context while integrating new context data.
The merge operation follows a three-level dictionary structure that organizes context data hierarchically by type, key, and field values. This structure provides efficient access patterns for context retrieval while maintaining serialization compatibility.
- Parameters:
existing (Optional[Dict[str, Dict[str, Dict[str, Any]]]]) – Existing capability context data dictionary from previous state
new (Dict[str, Dict[str, Dict[str, Any]]]) – New capability context data to merge into existing structure
- Returns:
Merged dictionary maintaining all existing data with new updates applied
- Return type:
Dict[str, Dict[str, Dict[str, Any]]]
Note
The function performs deep copying to prevent mutation of input dictionaries, ensuring state immutability and preventing side effects in LangGraph operations.
Warning
New context data will override existing context data for the same context_type and context_key combination. This is intentional for capability result updates but should be considered when designing context key strategies.
Examples
Basic context merging:
>>> existing = {"PV_ADDRESSES": {"step1": {"pvs": ["SR:C01:MAG:1"]}}} >>> new = {"PV_ADDRESSES": {"step2": {"pvs": ["SR:C02:MAG:1"]}}} >>> result = merge_capability_context_data(existing, new) >>> len(result["PV_ADDRESSES"]) 2
Context override behavior:
>>> existing = {"DATA": {"key1": {"value": "old"}}} >>> new = {"DATA": {"key1": {"value": "new", "extra": "data"}}} >>> result = merge_capability_context_data(existing, new) >>> result["DATA"]["key1"]["value"] 'new'
See also
AgentState: Main state class using this reducerosprey.context.ContextManager: Context data management utilities
Event Creation Utilities#
- osprey.state.create_status_update(message, progress, complete=False, **metadata)[source]#
Create a status update event for LangGraph state integration.
This utility function creates properly formatted status update events that can be merged into agent state using LangGraph’s native state update mechanisms. The function generates timestamped status events with progress tracking and metadata support for comprehensive execution monitoring.
Status updates are used throughout the framework to provide real-time feedback on execution progress, capability status, and system operations. The events are automatically formatted for consumption by UI components and logging systems.
- Parameters:
message (str) – Descriptive message about the current status or operation
progress (float) – Progress value between 0.0 and 1.0 indicating completion percentage
complete (bool) – Whether this status update indicates completion of the operation
metadata (Any) – Additional metadata fields to include in the status event
- Returns:
Dictionary containing status_updates list for LangGraph state merging
- Return type:
Dict[str, Any]
Note
The returned dictionary contains a ‘status_updates’ key with a list containing the new status event. This format is compatible with LangGraph’s automatic list merging for state updates.
Examples
Basic status update:
>>> update = create_status_update("Processing data", 0.5) >>> update['status_updates'][0]['message'] 'Processing data' >>> update['status_updates'][0]['progress'] 0.5
Completion status with metadata:
>>> update = create_status_update( ... "Analysis complete", 1.0, complete=True, ... node="data_analysis", items_processed=150 ... ) >>> update['status_updates'][0]['complete'] True >>> update['status_updates'][0]['items_processed'] 150
See also
create_progress_event(): Create progress tracking eventsAgentState: Main state class containing status_updates field
- osprey.state.create_progress_event(current, total, operation, **metadata)[source]#
Create a progress tracking event for LangGraph state integration.
This utility function creates properly formatted progress events that track incremental progress through multi-step operations. The function automatically calculates progress percentages and provides timestamped events for detailed execution monitoring and user feedback.
Progress events are particularly useful for long-running operations, batch processing, and multi-step workflows where users need visibility into execution progress and current operation status.
- Parameters:
current (int) – Current step or item number being processed (1-based)
total (int) – Total number of steps or items to be processed
operation (str) – Descriptive name of the operation being performed
metadata (Any) – Additional metadata fields to include in the progress event
- Returns:
Dictionary containing progress_events list for LangGraph state merging
- Return type:
Dict[str, Any]
Note
Progress is automatically calculated as current/total, with safe handling for division by zero. The returned dictionary format is compatible with LangGraph’s automatic list merging for state updates.
Warning
The current parameter should be 1-based (first item is 1, not 0) for intuitive progress reporting. The progress calculation handles this appropriately.
Examples
Basic progress tracking:
>>> event = create_progress_event(3, 10, "Processing files") >>> event['progress_events'][0]['current'] 3 >>> event['progress_events'][0]['progress'] 0.3
Progress with metadata:
>>> event = create_progress_event( ... 5, 20, "Analyzing data points", ... file_name="data.csv", bytes_processed=1024 ... ) >>> event['progress_events'][0]['operation'] 'Analyzing data points' >>> event['progress_events'][0]['file_name'] 'data.csv'
Handling edge cases:
>>> event = create_progress_event(0, 0, "Empty operation") >>> event['progress_events'][0]['progress'] 0.0
See also
create_status_update(): Create status update eventsAgentState: Main state class containing progress_events field
Execution Tracking#
- osprey.state.get_execution_steps_summary(state)[source]#
Generate ordered execution steps summary for prompts and UI display.
This utility function extracts and formats execution step information from the agent state to provide a clean summary of completed execution steps. The function is designed for use in capability prompts, error summaries, UI displays, and debugging contexts where execution history is needed.
The function processes the execution_step_results dictionary to create an ordered list of step descriptions based on execution order. Each step is formatted with a step number and task objective for clear presentation.
Processing Logic:
Results Extraction: Retrieves execution_step_results from state
Ordering: Sorts results by step_index to maintain execution order
Formatting: Creates numbered step descriptions with task objectives
Fallback Handling: Uses capability names when task objectives unavailable
- Parameters:
state (AgentState) – Current agent state containing execution_step_results
- Returns:
Ordered list of formatted step descriptions for display
- Return type:
List[str]
Note
The function handles missing or incomplete execution results gracefully, returning an empty list when no execution data is available. Step numbering starts from 1 for intuitive display.
Warning
The function relies on step_index values in execution results for ordering. If step_index is missing or incorrect, the order may not reflect actual execution sequence.
Examples
Basic execution summary:
>>> state = { ... "execution_step_results": { ... "step1": {"step_index": 0, "task_objective": "Find PV addresses"}, ... "step2": {"step_index": 1, "task_objective": "Retrieve values"} ... } ... } >>> steps = get_execution_steps_summary(state) >>> steps[0] 'Step 1: Find PV addresses' >>> steps[1] 'Step 2: Retrieve values'
Empty state handling:
>>> empty_state = {"execution_step_results": {}} >>> get_execution_steps_summary(empty_state) []
Fallback to capability names:
>>> state_without_objectives = { ... "execution_step_results": { ... "step1": {"step_index": 0, "capability": "pv_finder"} ... } ... } >>> steps = get_execution_steps_summary(state_without_objectives) >>> steps[0] 'Step 1: pv_finder'
See also
AgentState: State structure containing execution_step_resultsosprey.base.decorators: Capability execution trackingosprey.infrastructure: Infrastructure components using summaries
Type Aliases#
- osprey.state.StateUpdate: Dict[str, Any]#
Type alias for LangGraph state update dictionaries returned by capabilities and infrastructure nodes.
See also
osprey.base.planning.ExecutionPlanExecution planning structures used in state
osprey.registry.RegistryManagerComponent registry that manages context classes
- Registry System
Registry system for component management
- Context Management System
Complete guide to context management patterns
- State Management Architecture
Complete guide to state management patterns