Execution Control#
Infrastructure components that control the flow of execution including routing decisions, error handling, and conditional logic.
Router Node#
- class osprey.infrastructure.router_node.RouterNode[source]#
Bases:
BaseInfrastructureNodeCentral routing decision node for the Osprey Agent Framework.
This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.
The actual routing is handled by the router_conditional_edge function.
- name: str = 'router'#
- description: str = 'Central routing decision authority'#
- async execute()[source]#
Router node execution - updates routing metadata only.
This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.
- Returns:
Dictionary of state updates for routing metadata
- Return type:
Dict[str, Any]
- __repr__()#
Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.
- Returns:
String representation including class name and node name
- Return type:
str
Example
>>> node = TaskExtractionNode() >>> repr(node) '<TaskExtractionNode: task_extraction>'
Note
The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.
- static classify_error(exc, context)#
Classify errors for infrastructure-specific error handling and recovery.
This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.
The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).
- Parameters:
exc (Exception) – The exception that occurred during infrastructure operation
context (dict) – Error context including node info, execution state, and timing
- Returns:
Error classification with severity and recovery strategy
- Return type:
Note
The context dictionary includes:
infrastructure_node: node name for identificationexecution_time: time spent before failurecurrent_state: agent state at time of error
Example:
@staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: # Retry network timeouts for LLM-based infrastructure if isinstance(exc, (ConnectionError, TimeoutError)): return ErrorClassification( severity=ErrorSeverity.RETRIABLE, user_message="Network timeout, retrying...", metadata={"technical_details": str(exc)} ) return ErrorClassification( severity=ErrorSeverity.CRITICAL, user_message=f"Infrastructure error: {exc}", metadata={"technical_details": str(exc)} )
Note
Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).
- get_current_step()#
Get current execution step from state.
- Returns:
Current step dictionary with capability, task_objective, etc.
- Return type:
- Raises:
RuntimeError – If execution plan is missing or step index is invalid
Example
```python async def execute(self) -> dict[str, Any]:
step = self.get_current_step() task_objective = step.get(‘task_objective’)
- get_current_step_index()#
Get current step index from state.
- Returns:
Current step index (defaults to 0 if not set)
- Return type:
int
Example
```python async def execute(self) -> dict[str, Any]:
current_index = self.get_current_step_index()
- get_current_task()#
Get current task from state.
- Returns:
Current task string, or None if not set
- Return type:
str | None
Example
```python async def execute(self) -> dict[str, Any]:
current_task = self.get_current_task() if not current_task:
raise ValueError(“No current task available”)
- get_execution_plan()#
Get current execution plan from state with type validation.
- Returns:
ExecutionPlan if available and valid, None otherwise
Example
```python async def execute(self) -> dict[str, Any]:
execution_plan = self.get_execution_plan() if not execution_plan:
# Route to orchestrator
- get_logger()#
Get unified logger with automatic streaming support.
Creates a logger that: - Uses this infrastructure node’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting
- Returns:
ComponentLogger instance with streaming capability
- static get_retry_policy()#
Get conservative retry policy configuration for infrastructure operations.
This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.
Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.
- Returns:
Dictionary containing conservative retry configuration parameters
- Return type:
Dict[str, Any]
Note
Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.
Example:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 3, # More retries for LLM-based infrastructure "delay_seconds": 1.0, # Longer delay for external service calls "backoff_factor": 2.0 # Exponential backoff }
Note
The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.
- get_user_query()#
Get the user’s query from the current conversation.
- Returns:
The user’s query string, or None if no user messages exist
- Return type:
str | None
Example
```python async def execute(self) -> dict[str, Any]:
original_query = self.get_user_query()
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
- osprey.infrastructure.router_node.router_conditional_edge(state)[source]#
LangGraph conditional edge function for dynamic routing.
This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.
Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.
Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately
- Parameters:
state (AgentState) – Current agent state containing all execution context
- Returns:
Name of next node to execute or “END” to terminate
- Return type:
str
Error Node#
Core Models#
Execution control uses models defined in the core framework:
See also
ErrorClassificationError classification system
ErrorSeverityError severity levels
BaseInfrastructureNodeBase class for infrastructure components
Registration#
RouterNode is automatically registered as:
NodeRegistration(
name="router",
module_path="osprey.infrastructure.router_node",
function_name="RouterNode",
description="Central routing decision authority"
)
ErrorNode is automatically registered as:
NodeRegistration(
name="error",
module_path="osprey.infrastructure.error_node",
function_name="ErrorNode",
description="Error response generation"
)
See also
- Prompt System
Prompt customization system
- Error Handling
Implementation details and usage patterns