Message Generation#
Message generation capabilities for responding to user queries and requesting clarification when needed.
Respond Capability#
- class osprey.infrastructure.respond_node.RespondCapability[source]#
Bases:
BaseCapabilityRespond to user queries with appropriate response strategy.
Generates comprehensive responses for both technical queries requiring execution context and conversational queries. Adapts response strategy based on available context and execution history.
Initialize the capability and validate required components.
Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.
The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)
- Raises:
NotImplementedError – If name or description class attributes are missing
NotImplementedError – If execute static method is not implemented
ValueError – If requires field contains invalid format
Note
This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.
Warning
Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.
- name: str = 'respond'#
- description: str = 'Respond to user queries by generating appropriate responses for both technical and conversational questions'#
- provides: list[str] = ['FINAL_RESPONSE']#
- requires: list[str | tuple[str, Literal['single', 'multiple']]] = []#
- async execute()[source]#
Generate appropriate response using unified dynamic prompt construction.
- Returns:
State update with generated response
- Return type:
Dict[str, Any]
- __init__()#
Initialize the capability and validate required components.
Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.
The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)
- Raises:
NotImplementedError – If name or description class attributes are missing
NotImplementedError – If execute static method is not implemented
ValueError – If requires field contains invalid format
Note
This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.
Warning
Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.
- __repr__()#
Return a string representation of the capability.
- Returns:
String representation including class name and capability name
- Return type:
str
- property classifier_guide: Any | None#
Get the classifier guide for this capability (lazy-loaded).
Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.
- Returns:
Classifier guide for capability activation, or None if not needed
- Return type:
Optional[TaskClassifierGuide]
- get_logger()#
Get unified logger with automatic streaming support.
Creates a logger that: - Uses this capability’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting
The logger intelligently handles both CLI output and web UI streaming through a single API. High-level status updates (status, error, success) automatically stream to the web UI, while detailed logging (info, debug) goes to CLI only by default.
- Returns:
ComponentLogger instance with streaming capability
Example
```python async def execute(self) -> dict[str, Any]:
logger = self.get_logger()
# High-level status - logs + streams automatically logger.status(“Creating execution plan…”)
# Detailed info - logs only (unless explicitly requested) logger.info(f”Active capabilities: {capabilities}”)
# Explicit streaming for specific info logger.info(“Step 1 of 5 complete”, stream=True, progress=0.2)
# Errors always stream logger.error(“Validation failed”, validation_errors=[…])
# Success with metadata logger.success(“Plan created”, steps=5, total_time=2.3)
return self.store_output_context(result)
Note
The logger uses lazy initialization for streaming, so it gracefully handles contexts where LangGraph streaming is not available (tests, utilities, CLI-only execution).
See also
ComponentLogger: Logger class with streaming methodsget_logger(): Underlying logger factory function
- get_parameters(default=None)#
Get parameters from the current step.
The orchestrator can provide optional parameters in the step definition that control capability behavior (e.g., precision, timeout, mode).
- Parameters:
default (dict[str, Any] | None) – Default value to return if no parameters exist (defaults to empty dict)
- Returns:
Parameters dictionary from the step
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
dict[str, Any]
Example
```python async def execute(self) -> dict[str, Any]:
- get_required_contexts(constraint_mode='hard')#
Automatically extract contexts based on ‘requires’ field.
The constraint_mode applies uniformly to ALL requirements. Use “hard” when all are required, “soft” when at least one is required.
Tuple format is ONLY for cardinality constraints: - “single”: Must be exactly one instance (not a list) - “multiple”: Must be a list (not single instance)
- Parameters:
constraint_mode (Literal['hard', 'soft']) – “hard” (all required) or “soft” (at least one required)
- Returns:
RequiredContexts object supporting both dict and tuple unpacking access
- Raises:
RuntimeError – If called outside execute() (state not injected)
ValueError – If required contexts missing or cardinality violated
AttributeError – If context type not found in registry
- Return type:
RequiredContexts
Example
```python # Define requirements requires = [“CHANNEL_ADDRESSES”, (“TIME_RANGE”, “single”)]
# Elegant tuple unpacking (matches order in requires) channels, time_range = self.get_required_contexts()
# Traditional dict access (backward compatible) contexts = self.get_required_contexts() channels = contexts[“CHANNEL_ADDRESSES”] time_range = contexts[“TIME_RANGE”] ```
Note
Tuple unpacking only works reliably with constraint_mode=”hard” (default). When using “soft” mode, use dict access instead since the number of returned contexts may vary:
contexts = self.get_required_contexts(constraint_mode=”soft”) a = contexts.get(“CONTEXT_A”) b = contexts.get(“CONTEXT_B”)
- static get_retry_policy()#
Get retry policy configuration for failure recovery strategies.
This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.
The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase
- Returns:
Dictionary containing retry configuration parameters
- Return type:
Dict[str, Any]
Note
The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.
Examples
Aggressive retry for network-dependent capability:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 5, # More attempts for network issues "delay_seconds": 2.0, # Longer delay for external services "backoff_factor": 2.0 # Exponential backoff }
Conservative retry for expensive operations:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 2, # Minimal retries for expensive ops "delay_seconds": 0.1, # Quick retry for transient issues "backoff_factor": 1.0 # No backoff for fast operations }
See also
classify_error(): Error classification that determines when to retryErrorSeverity: RETRIABLE severity triggers retry policy usage
- get_step_inputs(default=None)#
Get the inputs list from the current step.
The orchestrator provides inputs in each step as a list of {context_type: context_key} mappings that specify which contexts are available for this step. This is commonly used for building context descriptions, validation, and informing the LLM about available data.
- Parameters:
default (list[dict[str, str]] | None) – Default value to return if no inputs exist (defaults to empty list)
- Returns:
List of input mappings from the step
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
list[dict[str, str]]
Example
```python async def execute(self) -> dict[str, Any]:
# Get step inputs step_inputs = self.get_step_inputs()
# Use with ContextManager to build description from osprey.context import ContextManager context_manager = ContextManager(self._state) context_description = context_manager.get_context_access_description(step_inputs)
# Or with a custom default step_inputs = self.get_step_inputs(default=[]) ```
- get_task_objective(default=None)#
Get the task objective for the current step.
The orchestrator provides task_objective in each step to describe what the capability should accomplish. This is commonly used for logging, search queries, and LLM prompts.
- Parameters:
default (str | None) – Default value if task_objective not in step. If None, falls back to current task from state.
- Returns:
Task objective string
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
str
Example
```python async def execute(self) -> dict[str, Any]:
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual retry handling via router.
- Return type:
dict[str, Any]
- property orchestrator_guide: Any | None#
Get the orchestrator guide for this capability (lazy-loaded).
Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.
- Returns:
Orchestrator guide for execution planning integration, or None if not needed
- Return type:
Optional[OrchestratorGuide]
- process_extracted_contexts(contexts)#
Override to customize extracted contexts (e.g., flatten lists).
- Parameters:
contexts (dict[str, CapabilityContext | list[CapabilityContext]]) – Dict mapping context type names to extracted objects
- Returns:
Processed contexts dict
- Return type:
dict[str, CapabilityContext | list[CapabilityContext]]
Example
```python def process_extracted_contexts(self, contexts):
‘’’Flatten list of CHANNEL_ADDRESSES.’’’ channels_raw = contexts[“CHANNEL_ADDRESSES”]
- if isinstance(channels_raw, list):
flat = [] for ctx in channels_raw:
flat.extend(ctx.channels)
contexts[“CHANNEL_ADDRESSES”] = flat
- else:
contexts[“CHANNEL_ADDRESSES”] = channels_raw.channels
return contexts
- store_output_context(context_data)#
Store single output context - uses context’s CONTEXT_TYPE attribute.
No need for provides field or state/step parameters!
- Parameters:
context_data (CapabilityContext) – Context object with CONTEXT_TYPE class variable
- Returns:
State updates dict for LangGraph to merge
- Raises:
AttributeError – If context_data lacks CONTEXT_TYPE class variable
RuntimeError – If called outside execute() (state not injected)
ValueError – If context_key missing from step
- Return type:
dict[str, Any]
Example
`python return self.store_output_context(ArchiverDataContext(...)) `
- store_output_contexts(*context_objects)#
Store multiple output contexts - all self-describing.
- Parameters:
*context_objects (CapabilityContext) – Context objects with CONTEXT_TYPE attributes
- Returns:
Merged state updates dict for LangGraph
- Raises:
AttributeError – If any context lacks CONTEXT_TYPE
RuntimeError – If called outside execute()
ValueError – If context types don’t match provides field
- Return type:
dict[str, Any]
Example
```python return self.store_output_contexts(
ArchiverDataContext(…), MetadataContext(…), StatisticsContext(…)
)#
- class osprey.infrastructure.respond_node.ResponseContext(current_task, execution_history, relevant_context, is_killed, kill_reason, capabilities_overview, total_steps_executed, execution_start_time, reclassification_count, current_date, figures_available, commands_available, notebooks_available, interface_context, chat_history=None)[source]#
Bases:
objectContainer for all information needed for response generation.
Aggregates all relevant information from the agent state for comprehensive response generation.
- Parameters:
current_task (str) – The current task being addressed
execution_history (List[Any]) – List of executed steps
relevant_context (List[Dict[str, Any]]) – Context data relevant to the response (list of summary dicts)
is_killed (bool) – Whether execution was terminated
kill_reason (Optional[str]) – Reason for termination if applicable
capabilities_overview (Optional[str]) – Overview of available capabilities
total_steps_executed (int) – Total number of steps executed
execution_start_time (Optional[float]) – When execution started
reclassification_count (int) – Number of reclassification attempts
current_date (str) – Current date for temporal context
figures_available (int) – Number of figures available for display
commands_available (int) – Number of launchable commands available
notebooks_available (int) – Number of notebook links available
interface_context (str) – Interface type (openwebui, cli, etc.)
chat_history (Optional[str]) – Formatted chat history when task depends on conversation context
- current_task: str#
- execution_history: list[Any]#
- relevant_context: list[dict[str, Any]]#
- is_killed: bool#
- kill_reason: str | None#
- capabilities_overview: str | None#
- total_steps_executed: int#
- execution_start_time: float | None#
- reclassification_count: int#
- current_date: str#
- figures_available: int#
- commands_available: int#
- notebooks_available: int#
- interface_context: str#
- chat_history: str | None = None#
- __init__(current_task, execution_history, relevant_context, is_killed, kill_reason, capabilities_overview, total_steps_executed, execution_start_time, reclassification_count, current_date, figures_available, commands_available, notebooks_available, interface_context, chat_history=None)#
Clarify Capability#
- class osprey.infrastructure.clarify_node.ClarifyCapability[source]#
Bases:
BaseCapabilityAsk user for clarification when queries are ambiguous.
Communication capability that generates targeted questions to clarify user intent when requests lack sufficient detail or context.
Initialize the capability and validate required components.
Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.
The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)
- Raises:
NotImplementedError – If name or description class attributes are missing
NotImplementedError – If execute static method is not implemented
ValueError – If requires field contains invalid format
Note
This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.
Warning
Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.
- name: str = 'clarify'#
- description: str = 'Ask specific questions when user queries are ambiguous or missing critical details'#
- provides: list[str] = []#
- requires: list[str | tuple[str, Literal['single', 'multiple']]] = []#
- async execute()[source]#
Generate specific questions to ask user based on missing information.
- Returns:
State update with clarification response
- Return type:
Dict[str, Any]
- __init__()#
Initialize the capability and validate required components.
Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.
The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)
- Raises:
NotImplementedError – If name or description class attributes are missing
NotImplementedError – If execute static method is not implemented
ValueError – If requires field contains invalid format
Note
This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.
Warning
Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.
- __repr__()#
Return a string representation of the capability.
- Returns:
String representation including class name and capability name
- Return type:
str
- property classifier_guide: Any | None#
Get the classifier guide for this capability (lazy-loaded).
Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.
- Returns:
Classifier guide for capability activation, or None if not needed
- Return type:
Optional[TaskClassifierGuide]
- get_logger()#
Get unified logger with automatic streaming support.
Creates a logger that: - Uses this capability’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting
The logger intelligently handles both CLI output and web UI streaming through a single API. High-level status updates (status, error, success) automatically stream to the web UI, while detailed logging (info, debug) goes to CLI only by default.
- Returns:
ComponentLogger instance with streaming capability
Example
```python async def execute(self) -> dict[str, Any]:
logger = self.get_logger()
# High-level status - logs + streams automatically logger.status(“Creating execution plan…”)
# Detailed info - logs only (unless explicitly requested) logger.info(f”Active capabilities: {capabilities}”)
# Explicit streaming for specific info logger.info(“Step 1 of 5 complete”, stream=True, progress=0.2)
# Errors always stream logger.error(“Validation failed”, validation_errors=[…])
# Success with metadata logger.success(“Plan created”, steps=5, total_time=2.3)
return self.store_output_context(result)
Note
The logger uses lazy initialization for streaming, so it gracefully handles contexts where LangGraph streaming is not available (tests, utilities, CLI-only execution).
See also
ComponentLogger: Logger class with streaming methodsget_logger(): Underlying logger factory function
- get_parameters(default=None)#
Get parameters from the current step.
The orchestrator can provide optional parameters in the step definition that control capability behavior (e.g., precision, timeout, mode).
- Parameters:
default (dict[str, Any] | None) – Default value to return if no parameters exist (defaults to empty dict)
- Returns:
Parameters dictionary from the step
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
dict[str, Any]
Example
```python async def execute(self) -> dict[str, Any]:
- get_required_contexts(constraint_mode='hard')#
Automatically extract contexts based on ‘requires’ field.
The constraint_mode applies uniformly to ALL requirements. Use “hard” when all are required, “soft” when at least one is required.
Tuple format is ONLY for cardinality constraints: - “single”: Must be exactly one instance (not a list) - “multiple”: Must be a list (not single instance)
- Parameters:
constraint_mode (Literal['hard', 'soft']) – “hard” (all required) or “soft” (at least one required)
- Returns:
RequiredContexts object supporting both dict and tuple unpacking access
- Raises:
RuntimeError – If called outside execute() (state not injected)
ValueError – If required contexts missing or cardinality violated
AttributeError – If context type not found in registry
- Return type:
RequiredContexts
Example
```python # Define requirements requires = [“CHANNEL_ADDRESSES”, (“TIME_RANGE”, “single”)]
# Elegant tuple unpacking (matches order in requires) channels, time_range = self.get_required_contexts()
# Traditional dict access (backward compatible) contexts = self.get_required_contexts() channels = contexts[“CHANNEL_ADDRESSES”] time_range = contexts[“TIME_RANGE”] ```
Note
Tuple unpacking only works reliably with constraint_mode=”hard” (default). When using “soft” mode, use dict access instead since the number of returned contexts may vary:
contexts = self.get_required_contexts(constraint_mode=”soft”) a = contexts.get(“CONTEXT_A”) b = contexts.get(“CONTEXT_B”)
- static get_retry_policy()#
Get retry policy configuration for failure recovery strategies.
This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.
The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase
- Returns:
Dictionary containing retry configuration parameters
- Return type:
Dict[str, Any]
Note
The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.
Examples
Aggressive retry for network-dependent capability:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 5, # More attempts for network issues "delay_seconds": 2.0, # Longer delay for external services "backoff_factor": 2.0 # Exponential backoff }
Conservative retry for expensive operations:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 2, # Minimal retries for expensive ops "delay_seconds": 0.1, # Quick retry for transient issues "backoff_factor": 1.0 # No backoff for fast operations }
See also
classify_error(): Error classification that determines when to retryErrorSeverity: RETRIABLE severity triggers retry policy usage
- get_step_inputs(default=None)#
Get the inputs list from the current step.
The orchestrator provides inputs in each step as a list of {context_type: context_key} mappings that specify which contexts are available for this step. This is commonly used for building context descriptions, validation, and informing the LLM about available data.
- Parameters:
default (list[dict[str, str]] | None) – Default value to return if no inputs exist (defaults to empty list)
- Returns:
List of input mappings from the step
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
list[dict[str, str]]
Example
```python async def execute(self) -> dict[str, Any]:
# Get step inputs step_inputs = self.get_step_inputs()
# Use with ContextManager to build description from osprey.context import ContextManager context_manager = ContextManager(self._state) context_description = context_manager.get_context_access_description(step_inputs)
# Or with a custom default step_inputs = self.get_step_inputs(default=[]) ```
- get_task_objective(default=None)#
Get the task objective for the current step.
The orchestrator provides task_objective in each step to describe what the capability should accomplish. This is commonly used for logging, search queries, and LLM prompts.
- Parameters:
default (str | None) – Default value if task_objective not in step. If None, falls back to current task from state.
- Returns:
Task objective string
- Raises:
RuntimeError – If called outside execute() (state not injected)
- Return type:
str
Example
```python async def execute(self) -> dict[str, Any]:
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual retry handling via router.
- Return type:
dict[str, Any]
- property orchestrator_guide: Any | None#
Get the orchestrator guide for this capability (lazy-loaded).
Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.
- Returns:
Orchestrator guide for execution planning integration, or None if not needed
- Return type:
Optional[OrchestratorGuide]
- process_extracted_contexts(contexts)#
Override to customize extracted contexts (e.g., flatten lists).
- Parameters:
contexts (dict[str, CapabilityContext | list[CapabilityContext]]) – Dict mapping context type names to extracted objects
- Returns:
Processed contexts dict
- Return type:
dict[str, CapabilityContext | list[CapabilityContext]]
Example
```python def process_extracted_contexts(self, contexts):
‘’’Flatten list of CHANNEL_ADDRESSES.’’’ channels_raw = contexts[“CHANNEL_ADDRESSES”]
- if isinstance(channels_raw, list):
flat = [] for ctx in channels_raw:
flat.extend(ctx.channels)
contexts[“CHANNEL_ADDRESSES”] = flat
- else:
contexts[“CHANNEL_ADDRESSES”] = channels_raw.channels
return contexts
- store_output_context(context_data)#
Store single output context - uses context’s CONTEXT_TYPE attribute.
No need for provides field or state/step parameters!
- Parameters:
context_data (CapabilityContext) – Context object with CONTEXT_TYPE class variable
- Returns:
State updates dict for LangGraph to merge
- Raises:
AttributeError – If context_data lacks CONTEXT_TYPE class variable
RuntimeError – If called outside execute() (state not injected)
ValueError – If context_key missing from step
- Return type:
dict[str, Any]
Example
`python return self.store_output_context(ArchiverDataContext(...)) `
- store_output_contexts(*context_objects)#
Store multiple output contexts - all self-describing.
- Parameters:
*context_objects (CapabilityContext) – Context objects with CONTEXT_TYPE attributes
- Returns:
Merged state updates dict for LangGraph
- Raises:
AttributeError – If any context lacks CONTEXT_TYPE
RuntimeError – If called outside execute()
ValueError – If context types don’t match provides field
- Return type:
dict[str, Any]
Example
```python return self.store_output_contexts(
ArchiverDataContext(…), MetadataContext(…), StatisticsContext(…)
)#
Core Models#
Message generation uses models defined in the core framework:
See also
BaseCapabilityBase class for all capabilities
ContextManagerContext management for response generation
Registration#
Respond Capability is automatically registered as:
CapabilityRegistration(
name="respond",
module_path="osprey.infrastructure.respond_node",
class_name="RespondCapability",
provides=["FINAL_RESPONSE"]
)
Clarify Capability is automatically registered as:
CapabilityRegistration(
name="clarify",
module_path="osprey.infrastructure.clarify_node",
class_name="ClarifyCapability",
provides=[]
)
See also
- Prompt System
Prompt customization system
- Registry System
Component registration system
- Message Generation
Implementation details and usage patterns