Task Extraction#

Infrastructure node that converts chat conversation history into focused, actionable tasks.

TaskExtractionNode#

class osprey.infrastructure.task_extraction_node.TaskExtractionNode[source]#

Bases: BaseInfrastructureNode

Convention-based task extraction node with sophisticated task processing logic.

Extracts and processes user tasks with context analysis, dependency detection, and task refinement. Handles both initial task extraction and task updates from conversations.

Features: - Configuration-driven error classification and retry policies - LLM-based task extraction with fallback mechanisms - Context-aware task processing - Dependency analysis for chat history and user memory - Sophisticated error handling for LLM operations

name: str = 'task_extraction'#
description: str = 'Task Extraction and Processing'#
static classify_error(exc, context)[source]#

Built-in error classification for task extraction operations.

Parameters:
  • exc (Exception) – Exception that occurred during task extraction

  • context (dict) – Execution context with task extraction details

Returns:

Error classification for retry decisions

Return type:

ErrorClassification

static get_retry_policy()[source]#

Custom retry policy for LLM-based task extraction operations.

Task extraction uses LLM calls to parse user queries and can be flaky due to: - Network timeouts to LLM services - LLM provider rate limiting - Complex query parsing requirements

Use standard retry attempts with moderate delays since task extraction is the entry point and should be reliable but not overly aggressive.

Return type:

dict[str, Any]

async execute()[source]#

Main task extraction logic with bypass support and error handling.

Converts conversational exchanges into clear, actionable task descriptions. Analyzes native LangGraph messages and external data sources to extract the user’s actual intent and dependencies on previous conversation context.

Supports bypass mode where full chat history is passed directly as the task, skipping LLM-based extraction for performance optimization.

Returns:

Dictionary of state updates to apply

Return type:

Dict[str, Any]

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

get_current_step()#

Get current execution step from state.

Returns:

Current step dictionary with capability, task_objective, etc.

Return type:

PlannedStep

Raises:

RuntimeError – If execution plan is missing or step index is invalid

Example

```python async def execute(self) -> dict[str, Any]:

step = self.get_current_step() task_objective = step.get(‘task_objective’)

```

get_current_step_index()#

Get current step index from state.

Returns:

Current step index (defaults to 0 if not set)

Return type:

int

Example

```python async def execute(self) -> dict[str, Any]:

current_index = self.get_current_step_index()

```

get_current_task()#

Get current task from state.

Returns:

Current task string, or None if not set

Return type:

str | None

Example:

async def execute(self) -> dict[str, Any]:
    current_task = self.get_current_task()
    if not current_task:
        raise ValueError("No current task available")
get_execution_plan()#

Get current execution plan from state with type validation.

Returns:

ExecutionPlan if available and valid, None otherwise

Example

```python async def execute(self) -> dict[str, Any]:

execution_plan = self.get_execution_plan() if not execution_plan:

# Route to orchestrator

```

get_logger()#

Get unified logger with automatic streaming support.

Creates a logger that: - Uses this infrastructure node’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting

Returns:

ComponentLogger instance with streaming capability

get_user_query()#

Get the user’s query from the current conversation.

Returns:

The user’s query string, or None if no user messages exist

Return type:

str | None

Example

```python async def execute(self) -> dict[str, Any]:

original_query = self.get_user_query()

```

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

Supporting Functions#

Core Models#

Task extraction uses models defined in the core framework:

See also

ExtractedTask

Structured output model for extracted tasks

BaseInfrastructureNode

Base class for infrastructure components

Registration#

Automatically registered as:

NodeRegistration(
    name="task_extraction",
    module_path="osprey.infrastructure.task_extraction_node",
    function_name="TaskExtractionNode",
    description="Task extraction and processing"
)

See also

Prompt System

Prompt customization system

Registry System

Component registration system

Task Extraction

Implementation details and usage patterns