Classification#

Infrastructure node that handles task classification and capability selection by analyzing user tasks against available capabilities using parallel processing with semaphore-controlled concurrency.

ClassificationNode#

class osprey.infrastructure.classification_node.ClassificationNode[source]#

Bases: BaseInfrastructureNode

Convention-based classification node with sophisticated capability selection logic.

Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification with few-shot examples. Handles both initial classification and reclassification scenarios.

Uses LangGraph’s sophisticated state merging with built-in error handling and retry policies optimized for LLM-based classification operations.

name: str = 'classifier'#
description: str = 'Task Classification and Capability Selection'#
static classify_error(exc, context)[source]#

Built-in error classification for classifier operations.

Parameters:
  • exc (Exception) – Exception that occurred

  • context (dict[str, Any]) – Error context information

Returns:

Classification with severity and retry guidance

Return type:

ErrorClassification

static get_retry_policy()[source]#

Custom retry policy for LLM-based classification operations.

Classification uses parallel LLM calls for capability selection and can be flaky due to: - Multiple concurrent LLM requests - Network timeouts to LLM services - LLM provider rate limiting - Classification model variability

Use more attempts with moderate delays for better reliability.

Return type:

dict[str, Any]

async execute()[source]#

Main classification logic with bypass support and sophisticated capability selection.

Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification. Handles both initial classification and reclassification scenarios with state preservation.

Supports bypass mode where all available capabilities are activated, skipping LLM-based classification for performance optimization.

Returns:

Dictionary of state updates for LangGraph

Return type:

Dict[str, Any]

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

get_current_step()#

Get current execution step from state.

Returns:

Current step dictionary with capability, task_objective, etc.

Return type:

PlannedStep

Raises:

RuntimeError – If execution plan is missing or step index is invalid

Example

```python async def execute(self) -> dict[str, Any]:

step = self.get_current_step() task_objective = step.get(‘task_objective’)

```

get_current_step_index()#

Get current step index from state.

Returns:

Current step index (defaults to 0 if not set)

Return type:

int

Example

```python async def execute(self) -> dict[str, Any]:

current_index = self.get_current_step_index()

```

get_current_task()#

Get current task from state.

Returns:

Current task string, or None if not set

Return type:

str | None

Example:

async def execute(self) -> dict[str, Any]:
    current_task = self.get_current_task()
    if not current_task:
        raise ValueError("No current task available")
get_execution_plan()#

Get current execution plan from state with type validation.

Returns:

ExecutionPlan if available and valid, None otherwise

Example

```python async def execute(self) -> dict[str, Any]:

execution_plan = self.get_execution_plan() if not execution_plan:

# Route to orchestrator

```

get_logger()#

Get unified logger with automatic streaming support.

Creates a logger that: - Uses this infrastructure node’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting

Returns:

ComponentLogger instance with streaming capability

get_user_query()#

Get the user’s query from the current conversation.

Returns:

The user’s query string, or None if no user messages exist

Return type:

str | None

Example

```python async def execute(self) -> dict[str, Any]:

original_query = self.get_user_query()

```

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

Supporting Functions#

async osprey.infrastructure.classification_node.select_capabilities(task, available_capabilities, state, logger, previous_failure=None)[source]#

Select capabilities needed for the task by using classification.

Parameters:
  • task (str) – Task description for analysis

  • available_capabilities (List[BaseCapability]) – Available capabilities to choose from

  • state (AgentState) – Current agent state

  • logger – Logger instance

  • previous_failure (str | None) – Previous failure reason for reclassification context

Returns:

List of capability names needed for the task

Return type:

List[str]

osprey.infrastructure.classification_node._detect_reclassification_scenario(state)[source]#

Detect if this classification is a reclassification due to a previous error.

Analyzes the current agent state to determine if this classification run is happening because a previous capability (like orchestrator) failed and requested reclassification.

Parameters:

state (AgentState) – Current agent state containing error information

Returns:

Reclassification reason string if this is a reclassification, None otherwise

Return type:

Optional[str]

CapabilityClassifier#

class osprey.infrastructure.classification_node.CapabilityClassifier(task, state, logger, previous_failure=None)[source]#

Bases: object

Handles individual capability classification with proper resource management.

__init__(task, state, logger, previous_failure=None)[source]#
async classify(capability, semaphore)[source]#

Classify a single capability with semaphore-controlled concurrency.

Parameters:
  • capability (BaseCapability) – The capability to analyze

  • semaphore (Semaphore) – Semaphore for concurrency control

Returns:

True if capability is required, False otherwise

Return type:

bool

Core Models#

Classification uses models defined in the core framework:

See also

CapabilityMatch

Classification results for capability selection

TaskClassifierGuide

Classification guidance structure

ClassifierExample

Few-shot examples for classification

BaseInfrastructureNode

Base class for infrastructure components

Registration#

Automatically registered as:

NodeRegistration(
    name="classifier",
    module_path="osprey.infrastructure.classification_node",
    function_name="ClassificationNode",
    description="Task classification and capability selection"
)

See also

Prompt System

Prompt customization system

Registry System

Component and capability management

Classification and Routing

Implementation details and usage patterns