Execution Control#

Infrastructure components that control the flow of execution including routing decisions, error handling, and conditional logic.

Router Node#

class osprey.infrastructure.router_node.RouterNode[source]#

Bases: BaseInfrastructureNode

Central routing decision node for the Osprey Agent Framework.

This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.

The actual routing is handled by the router_conditional_edge function.

name: str = 'router'#
description: str = 'Central routing decision authority'#
async execute()[source]#

Router node execution - updates routing metadata only.

This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.

Returns:

Dictionary of state updates for routing metadata

Return type:

Dict[str, Any]

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

static classify_error(exc, context)#

Classify errors for infrastructure-specific error handling and recovery.

This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.

The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).

Parameters:
  • exc (Exception) – The exception that occurred during infrastructure operation

  • context (dict) – Error context including node info, execution state, and timing

Returns:

Error classification with severity and recovery strategy

Return type:

ErrorClassification

Note

The context dictionary includes:

  • infrastructure_node: node name for identification

  • execution_time: time spent before failure

  • current_state: agent state at time of error

Example:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts for LLM-based infrastructure
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network timeout, retrying...",
            metadata={"technical_details": str(exc)}
        )
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Infrastructure error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Note

Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).

get_current_step()#

Get current execution step from state.

Returns:

Current step dictionary with capability, task_objective, etc.

Return type:

PlannedStep

Raises:

RuntimeError – If execution plan is missing or step index is invalid

Example

```python async def execute(self) -> dict[str, Any]:

step = self.get_current_step() task_objective = step.get(‘task_objective’)

```

get_current_step_index()#

Get current step index from state.

Returns:

Current step index (defaults to 0 if not set)

Return type:

int

Example

```python async def execute(self) -> dict[str, Any]:

current_index = self.get_current_step_index()

```

get_current_task()#

Get current task from state.

Returns:

Current task string, or None if not set

Return type:

str | None

Example

```python async def execute(self) -> dict[str, Any]:

current_task = self.get_current_task() if not current_task:

raise ValueError(“No current task available”)

```

get_execution_plan()#

Get current execution plan from state with type validation.

Returns:

ExecutionPlan if available and valid, None otherwise

Example

```python async def execute(self) -> dict[str, Any]:

execution_plan = self.get_execution_plan() if not execution_plan:

# Route to orchestrator

```

get_logger()#

Get unified logger with automatic streaming support.

Creates a logger that: - Uses this infrastructure node’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting

Returns:

ComponentLogger instance with streaming capability

static get_retry_policy()#

Get conservative retry policy configuration for infrastructure operations.

This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.

Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.

Returns:

Dictionary containing conservative retry configuration parameters

Return type:

Dict[str, Any]

Note

Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.

Example:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 3,  # More retries for LLM-based infrastructure
        "delay_seconds": 1.0,  # Longer delay for external service calls
        "backoff_factor": 2.0  # Exponential backoff
    }

Note

The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.

get_user_query()#

Get the user’s query from the current conversation.

Returns:

The user’s query string, or None if no user messages exist

Return type:

str | None

Example

```python async def execute(self) -> dict[str, Any]:

original_query = self.get_user_query()

```

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

osprey.infrastructure.router_node.router_conditional_edge(state)[source]#

LangGraph conditional edge function for dynamic routing.

This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.

Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.

Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately

Parameters:

state (AgentState) – Current agent state containing all execution context

Returns:

Name of next node to execute or “END” to terminate

Return type:

str

Error Node#

Core Models#

Execution control uses models defined in the core framework:

See also

ErrorClassification

Error classification system

ErrorSeverity

Error severity levels

BaseInfrastructureNode

Base class for infrastructure components

Registration#

RouterNode is automatically registered as:

NodeRegistration(
    name="router",
    module_path="osprey.infrastructure.router_node",
    function_name="RouterNode",
    description="Central routing decision authority"
)

ErrorNode is automatically registered as:

NodeRegistration(
    name="error",
    module_path="osprey.infrastructure.error_node",
    function_name="ErrorNode",
    description="Error response generation"
)

See also

Prompt System

Prompt customization system

Error Handling

Implementation details and usage patterns