Classification#
Infrastructure node that handles task classification and capability selection by analyzing user tasks against available capabilities using parallel processing with semaphore-controlled concurrency.
ClassificationNode#
- class osprey.infrastructure.classification_node.ClassificationNode[source]#
Bases:
BaseInfrastructureNodeConvention-based classification node with sophisticated capability selection logic.
Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification with few-shot examples. Handles both initial classification and reclassification scenarios.
Uses LangGraph’s sophisticated state merging with built-in error handling and retry policies optimized for LLM-based classification operations.
- name: str = 'classifier'#
- description: str = 'Task Classification and Capability Selection'#
- static classify_error(exc, context)[source]#
Built-in error classification for classifier operations.
- Parameters:
exc (Exception) – Exception that occurred
context (dict[str, Any]) – Error context information
- Returns:
Classification with severity and retry guidance
- Return type:
- static get_retry_policy()[source]#
Custom retry policy for LLM-based classification operations.
Classification uses parallel LLM calls for capability selection and can be flaky due to: - Multiple concurrent LLM requests - Network timeouts to LLM services - LLM provider rate limiting - Classification model variability
Use more attempts with moderate delays for better reliability.
- Return type:
dict[str, Any]
- async execute()[source]#
Main classification logic with bypass support and sophisticated capability selection.
Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification. Handles both initial classification and reclassification scenarios with state preservation.
Supports bypass mode where all available capabilities are activated, skipping LLM-based classification for performance optimization.
- Returns:
Dictionary of state updates for LangGraph
- Return type:
Dict[str, Any]
- __repr__()#
Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.
- Returns:
String representation including class name and node name
- Return type:
str
Example
>>> node = TaskExtractionNode() >>> repr(node) '<TaskExtractionNode: task_extraction>'
Note
The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.
- get_current_step()#
Get current execution step from state.
- Returns:
Current step dictionary with capability, task_objective, etc.
- Return type:
- Raises:
RuntimeError – If execution plan is missing or step index is invalid
Example
```python async def execute(self) -> dict[str, Any]:
step = self.get_current_step() task_objective = step.get(‘task_objective’)
- get_current_step_index()#
Get current step index from state.
- Returns:
Current step index (defaults to 0 if not set)
- Return type:
int
Example
```python async def execute(self) -> dict[str, Any]:
current_index = self.get_current_step_index()
- get_current_task()#
Get current task from state.
- Returns:
Current task string, or None if not set
- Return type:
str | None
Example:
async def execute(self) -> dict[str, Any]: current_task = self.get_current_task() if not current_task: raise ValueError("No current task available")
- get_execution_plan()#
Get current execution plan from state with type validation.
- Returns:
ExecutionPlan if available and valid, None otherwise
Example
```python async def execute(self) -> dict[str, Any]:
execution_plan = self.get_execution_plan() if not execution_plan:
# Route to orchestrator
- get_logger()#
Get unified logger with automatic streaming support.
Creates a logger that: - Uses this infrastructure node’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting
- Returns:
ComponentLogger instance with streaming capability
- get_user_query()#
Get the user’s query from the current conversation.
- Returns:
The user’s query string, or None if no user messages exist
- Return type:
str | None
Example
```python async def execute(self) -> dict[str, Any]:
original_query = self.get_user_query()
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
Supporting Functions#
- async osprey.infrastructure.classification_node.select_capabilities(task, available_capabilities, state, logger, previous_failure=None)[source]#
Select capabilities needed for the task by using classification.
- Parameters:
task (str) – Task description for analysis
available_capabilities (List[BaseCapability]) – Available capabilities to choose from
state (AgentState) – Current agent state
logger – Logger instance
previous_failure (str | None) – Previous failure reason for reclassification context
- Returns:
List of capability names needed for the task
- Return type:
List[str]
- osprey.infrastructure.classification_node._detect_reclassification_scenario(state)[source]#
Detect if this classification is a reclassification due to a previous error.
Analyzes the current agent state to determine if this classification run is happening because a previous capability (like orchestrator) failed and requested reclassification.
- Parameters:
state (AgentState) – Current agent state containing error information
- Returns:
Reclassification reason string if this is a reclassification, None otherwise
- Return type:
Optional[str]
CapabilityClassifier#
- class osprey.infrastructure.classification_node.CapabilityClassifier(task, state, logger, previous_failure=None)[source]#
Bases:
objectHandles individual capability classification with proper resource management.
- async classify(capability, semaphore)[source]#
Classify a single capability with semaphore-controlled concurrency.
- Parameters:
capability (BaseCapability) – The capability to analyze
semaphore (Semaphore) – Semaphore for concurrency control
- Returns:
True if capability is required, False otherwise
- Return type:
bool
Core Models#
Classification uses models defined in the core framework:
See also
CapabilityMatchClassification results for capability selection
TaskClassifierGuideClassification guidance structure
ClassifierExampleFew-shot examples for classification
BaseInfrastructureNodeBase class for infrastructure components
Registration#
Automatically registered as:
NodeRegistration(
name="classifier",
module_path="osprey.infrastructure.classification_node",
function_name="ClassificationNode",
description="Task classification and capability selection"
)
See also
- Prompt System
Prompt customization system
- Registry System
Component and capability management
- Classification and Routing
Implementation details and usage patterns