Message Generation#

Message generation capabilities for responding to user queries and requesting clarification when needed.

Respond Capability#

class osprey.infrastructure.respond_node.RespondCapability[source]#

Bases: BaseCapability

Respond to user queries with appropriate response strategy.

Generates comprehensive responses for both technical queries requiring execution context and conversational queries. Adapts response strategy based on available context and execution history.

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

  • ValueError – If requires field contains invalid format

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

name: str = 'respond'#
description: str = 'Respond to user queries by generating appropriate responses for both technical and conversational questions'#
provides: list[str] = ['FINAL_RESPONSE']#
requires: list[str | tuple[str, Literal['single', 'multiple']]] = []#
async execute()[source]#

Generate appropriate response using unified dynamic prompt construction.

Returns:

State update with generated response

Return type:

Dict[str, Any]

static classify_error(exc, context)[source]#

Response generation error classification.

__init__()#

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

  • ValueError – If requires field contains invalid format

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

__repr__()#

Return a string representation of the capability.

Returns:

String representation including class name and capability name

Return type:

str

property classifier_guide: Any | None#

Get the classifier guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.

Returns:

Classifier guide for capability activation, or None if not needed

Return type:

Optional[TaskClassifierGuide]

get_logger()#

Get unified logger with automatic streaming support.

Creates a logger that: - Uses this capability’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting

The logger intelligently handles both CLI output and web UI streaming through a single API. High-level status updates (status, error, success) automatically stream to the web UI, while detailed logging (info, debug) goes to CLI only by default.

Returns:

ComponentLogger instance with streaming capability

Example

```python async def execute(self) -> dict[str, Any]:

logger = self.get_logger()

# High-level status - logs + streams automatically logger.status(“Creating execution plan…”)

# Detailed info - logs only (unless explicitly requested) logger.info(f”Active capabilities: {capabilities}”)

# Explicit streaming for specific info logger.info(“Step 1 of 5 complete”, stream=True, progress=0.2)

# Errors always stream logger.error(“Validation failed”, validation_errors=[…])

# Success with metadata logger.success(“Plan created”, steps=5, total_time=2.3)

return self.store_output_context(result)

```

Note

The logger uses lazy initialization for streaming, so it gracefully handles contexts where LangGraph streaming is not available (tests, utilities, CLI-only execution).

See also

ComponentLogger : Logger class with streaming methods get_logger() : Underlying logger factory function

get_parameters(default=None)#

Get parameters from the current step.

The orchestrator can provide optional parameters in the step definition that control capability behavior (e.g., precision, timeout, mode).

Parameters:

default (dict[str, Any] | None) – Default value to return if no parameters exist (defaults to empty dict)

Returns:

Parameters dictionary from the step

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

dict[str, Any]

Example

```python async def execute(self) -> dict[str, Any]:

params = self.get_parameters() precision_ms = params.get(‘precision_ms’, 1000) timeout = params.get(‘timeout’, 30)

# Or with a custom default params = self.get_parameters(default={‘precision_ms’: 1000}) ```

get_required_contexts(constraint_mode='hard')#

Automatically extract contexts based on ‘requires’ field.

The constraint_mode applies uniformly to ALL requirements. Use “hard” when all are required, “soft” when at least one is required.

Tuple format is ONLY for cardinality constraints: - “single”: Must be exactly one instance (not a list) - “multiple”: Must be a list (not single instance)

Parameters:

constraint_mode (Literal['hard', 'soft']) – “hard” (all required) or “soft” (at least one required)

Returns:

RequiredContexts object supporting both dict and tuple unpacking access

Raises:
  • RuntimeError – If called outside execute() (state not injected)

  • ValueError – If required contexts missing or cardinality violated

  • AttributeError – If context type not found in registry

Return type:

RequiredContexts

Example

```python # Define requirements requires = [“CHANNEL_ADDRESSES”, (“TIME_RANGE”, “single”)]

# Elegant tuple unpacking (matches order in requires) channels, time_range = self.get_required_contexts()

# Traditional dict access (backward compatible) contexts = self.get_required_contexts() channels = contexts[“CHANNEL_ADDRESSES”] time_range = contexts[“TIME_RANGE”] ```

Note

Tuple unpacking only works reliably with constraint_mode=”hard” (default). When using “soft” mode, use dict access instead since the number of returned contexts may vary:

contexts = self.get_required_contexts(constraint_mode=”soft”) a = contexts.get(“CONTEXT_A”) b = contexts.get(“CONTEXT_B”)

static get_retry_policy()#

Get retry policy configuration for failure recovery strategies.

This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.

The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase

Returns:

Dictionary containing retry configuration parameters

Return type:

Dict[str, Any]

Note

The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.

Examples

Aggressive retry for network-dependent capability:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 5,      # More attempts for network issues
        "delay_seconds": 2.0,   # Longer delay for external services
        "backoff_factor": 2.0   # Exponential backoff
    }

Conservative retry for expensive operations:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 2,      # Minimal retries for expensive ops
        "delay_seconds": 0.1,   # Quick retry for transient issues
        "backoff_factor": 1.0   # No backoff for fast operations
    }

See also

classify_error() : Error classification that determines when to retry ErrorSeverity : RETRIABLE severity triggers retry policy usage

get_step_inputs(default=None)#

Get the inputs list from the current step.

The orchestrator provides inputs in each step as a list of {context_type: context_key} mappings that specify which contexts are available for this step. This is commonly used for building context descriptions, validation, and informing the LLM about available data.

Parameters:

default (list[dict[str, str]] | None) – Default value to return if no inputs exist (defaults to empty list)

Returns:

List of input mappings from the step

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

list[dict[str, str]]

Example

```python async def execute(self) -> dict[str, Any]:

# Get step inputs step_inputs = self.get_step_inputs()

# Use with ContextManager to build description from osprey.context import ContextManager context_manager = ContextManager(self._state) context_description = context_manager.get_context_access_description(step_inputs)

# Or with a custom default step_inputs = self.get_step_inputs(default=[]) ```

get_task_objective(default=None)#

Get the task objective for the current step.

The orchestrator provides task_objective in each step to describe what the capability should accomplish. This is commonly used for logging, search queries, and LLM prompts.

Parameters:

default (str | None) – Default value if task_objective not in step. If None, falls back to current task from state.

Returns:

Task objective string

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

str

Example

```python async def execute(self) -> dict[str, Any]:

# Get task objective with automatic fallback task = self.get_task_objective() logger.info(f”Starting: {task}”)

# Or with custom default task = self.get_task_objective(default=”unknown task”)

# Common pattern: use as search query search_query = self.get_task_objective().lower() ```

async langgraph_node(**kwargs)#

LangGraph-native node function with manual retry handling via router.

Return type:

dict[str, Any]

property orchestrator_guide: Any | None#

Get the orchestrator guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.

Returns:

Orchestrator guide for execution planning integration, or None if not needed

Return type:

Optional[OrchestratorGuide]

process_extracted_contexts(contexts)#

Override to customize extracted contexts (e.g., flatten lists).

Parameters:

contexts (dict[str, CapabilityContext | list[CapabilityContext]]) – Dict mapping context type names to extracted objects

Returns:

Processed contexts dict

Return type:

dict[str, CapabilityContext | list[CapabilityContext]]

Example

```python def process_extracted_contexts(self, contexts):

‘’’Flatten list of CHANNEL_ADDRESSES.’’’ channels_raw = contexts[“CHANNEL_ADDRESSES”]

if isinstance(channels_raw, list):

flat = [] for ctx in channels_raw:

flat.extend(ctx.channels)

contexts[“CHANNEL_ADDRESSES”] = flat

else:

contexts[“CHANNEL_ADDRESSES”] = channels_raw.channels

return contexts

```

store_output_context(context_data)#

Store single output context - uses context’s CONTEXT_TYPE attribute.

No need for provides field or state/step parameters!

Parameters:

context_data (CapabilityContext) – Context object with CONTEXT_TYPE class variable

Returns:

State updates dict for LangGraph to merge

Raises:
  • AttributeError – If context_data lacks CONTEXT_TYPE class variable

  • RuntimeError – If called outside execute() (state not injected)

  • ValueError – If context_key missing from step

Return type:

dict[str, Any]

Example

`python return self.store_output_context(ArchiverDataContext(...)) `

store_output_contexts(*context_objects)#

Store multiple output contexts - all self-describing.

Parameters:

*context_objects (CapabilityContext) – Context objects with CONTEXT_TYPE attributes

Returns:

Merged state updates dict for LangGraph

Raises:
  • AttributeError – If any context lacks CONTEXT_TYPE

  • RuntimeError – If called outside execute()

  • ValueError – If context types don’t match provides field

Return type:

dict[str, Any]

Example

```python return self.store_output_contexts(

ArchiverDataContext(…), MetadataContext(…), StatisticsContext(…)

)#

class osprey.infrastructure.respond_node.ResponseContext(current_task, execution_history, relevant_context, is_killed, kill_reason, capabilities_overview, total_steps_executed, execution_start_time, reclassification_count, current_date, figures_available, commands_available, notebooks_available, interface_context, chat_history=None)[source]#

Bases: object

Container for all information needed for response generation.

Aggregates all relevant information from the agent state for comprehensive response generation.

Parameters:
  • current_task (str) – The current task being addressed

  • execution_history (List[Any]) – List of executed steps

  • relevant_context (List[Dict[str, Any]]) – Context data relevant to the response (list of summary dicts)

  • is_killed (bool) – Whether execution was terminated

  • kill_reason (Optional[str]) – Reason for termination if applicable

  • capabilities_overview (Optional[str]) – Overview of available capabilities

  • total_steps_executed (int) – Total number of steps executed

  • execution_start_time (Optional[float]) – When execution started

  • reclassification_count (int) – Number of reclassification attempts

  • current_date (str) – Current date for temporal context

  • figures_available (int) – Number of figures available for display

  • commands_available (int) – Number of launchable commands available

  • notebooks_available (int) – Number of notebook links available

  • interface_context (str) – Interface type (openwebui, cli, etc.)

  • chat_history (Optional[str]) – Formatted chat history when task depends on conversation context

current_task: str#
execution_history: list[Any]#
relevant_context: list[dict[str, Any]]#
is_killed: bool#
kill_reason: str | None#
capabilities_overview: str | None#
total_steps_executed: int#
execution_start_time: float | None#
reclassification_count: int#
current_date: str#
figures_available: int#
commands_available: int#
notebooks_available: int#
interface_context: str#
chat_history: str | None = None#
__init__(current_task, execution_history, relevant_context, is_killed, kill_reason, capabilities_overview, total_steps_executed, execution_start_time, reclassification_count, current_date, figures_available, commands_available, notebooks_available, interface_context, chat_history=None)#

Clarify Capability#

class osprey.infrastructure.clarify_node.ClarifyCapability[source]#

Bases: BaseCapability

Ask user for clarification when queries are ambiguous.

Communication capability that generates targeted questions to clarify user intent when requests lack sufficient detail or context.

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

  • ValueError – If requires field contains invalid format

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

name: str = 'clarify'#
description: str = 'Ask specific questions when user queries are ambiguous or missing critical details'#
provides: list[str] = []#
requires: list[str | tuple[str, Literal['single', 'multiple']]] = []#
async execute()[source]#

Generate specific questions to ask user based on missing information.

Returns:

State update with clarification response

Return type:

Dict[str, Any]

static classify_error(exc, context)[source]#

Clarify error classification.

__init__()#

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing 4. The ‘requires’ field format is valid (strings or (context_type, cardinality) tuples)

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

  • ValueError – If requires field contains invalid format

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

__repr__()#

Return a string representation of the capability.

Returns:

String representation including class name and capability name

Return type:

str

property classifier_guide: Any | None#

Get the classifier guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.

Returns:

Classifier guide for capability activation, or None if not needed

Return type:

Optional[TaskClassifierGuide]

get_logger()#

Get unified logger with automatic streaming support.

Creates a logger that: - Uses this capability’s name automatically - Has access to state for streaming via self._state - Streams high-level messages automatically when in LangGraph context - Logs to CLI with Rich formatting

The logger intelligently handles both CLI output and web UI streaming through a single API. High-level status updates (status, error, success) automatically stream to the web UI, while detailed logging (info, debug) goes to CLI only by default.

Returns:

ComponentLogger instance with streaming capability

Example

```python async def execute(self) -> dict[str, Any]:

logger = self.get_logger()

# High-level status - logs + streams automatically logger.status(“Creating execution plan…”)

# Detailed info - logs only (unless explicitly requested) logger.info(f”Active capabilities: {capabilities}”)

# Explicit streaming for specific info logger.info(“Step 1 of 5 complete”, stream=True, progress=0.2)

# Errors always stream logger.error(“Validation failed”, validation_errors=[…])

# Success with metadata logger.success(“Plan created”, steps=5, total_time=2.3)

return self.store_output_context(result)

```

Note

The logger uses lazy initialization for streaming, so it gracefully handles contexts where LangGraph streaming is not available (tests, utilities, CLI-only execution).

See also

ComponentLogger : Logger class with streaming methods get_logger() : Underlying logger factory function

get_parameters(default=None)#

Get parameters from the current step.

The orchestrator can provide optional parameters in the step definition that control capability behavior (e.g., precision, timeout, mode).

Parameters:

default (dict[str, Any] | None) – Default value to return if no parameters exist (defaults to empty dict)

Returns:

Parameters dictionary from the step

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

dict[str, Any]

Example

```python async def execute(self) -> dict[str, Any]:

params = self.get_parameters() precision_ms = params.get(‘precision_ms’, 1000) timeout = params.get(‘timeout’, 30)

# Or with a custom default params = self.get_parameters(default={‘precision_ms’: 1000}) ```

get_required_contexts(constraint_mode='hard')#

Automatically extract contexts based on ‘requires’ field.

The constraint_mode applies uniformly to ALL requirements. Use “hard” when all are required, “soft” when at least one is required.

Tuple format is ONLY for cardinality constraints: - “single”: Must be exactly one instance (not a list) - “multiple”: Must be a list (not single instance)

Parameters:

constraint_mode (Literal['hard', 'soft']) – “hard” (all required) or “soft” (at least one required)

Returns:

RequiredContexts object supporting both dict and tuple unpacking access

Raises:
  • RuntimeError – If called outside execute() (state not injected)

  • ValueError – If required contexts missing or cardinality violated

  • AttributeError – If context type not found in registry

Return type:

RequiredContexts

Example

```python # Define requirements requires = [“CHANNEL_ADDRESSES”, (“TIME_RANGE”, “single”)]

# Elegant tuple unpacking (matches order in requires) channels, time_range = self.get_required_contexts()

# Traditional dict access (backward compatible) contexts = self.get_required_contexts() channels = contexts[“CHANNEL_ADDRESSES”] time_range = contexts[“TIME_RANGE”] ```

Note

Tuple unpacking only works reliably with constraint_mode=”hard” (default). When using “soft” mode, use dict access instead since the number of returned contexts may vary:

contexts = self.get_required_contexts(constraint_mode=”soft”) a = contexts.get(“CONTEXT_A”) b = contexts.get(“CONTEXT_B”)

static get_retry_policy()#

Get retry policy configuration for failure recovery strategies.

This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.

The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase

Returns:

Dictionary containing retry configuration parameters

Return type:

Dict[str, Any]

Note

The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.

Examples

Aggressive retry for network-dependent capability:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 5,      # More attempts for network issues
        "delay_seconds": 2.0,   # Longer delay for external services
        "backoff_factor": 2.0   # Exponential backoff
    }

Conservative retry for expensive operations:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 2,      # Minimal retries for expensive ops
        "delay_seconds": 0.1,   # Quick retry for transient issues
        "backoff_factor": 1.0   # No backoff for fast operations
    }

See also

classify_error() : Error classification that determines when to retry ErrorSeverity : RETRIABLE severity triggers retry policy usage

get_step_inputs(default=None)#

Get the inputs list from the current step.

The orchestrator provides inputs in each step as a list of {context_type: context_key} mappings that specify which contexts are available for this step. This is commonly used for building context descriptions, validation, and informing the LLM about available data.

Parameters:

default (list[dict[str, str]] | None) – Default value to return if no inputs exist (defaults to empty list)

Returns:

List of input mappings from the step

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

list[dict[str, str]]

Example

```python async def execute(self) -> dict[str, Any]:

# Get step inputs step_inputs = self.get_step_inputs()

# Use with ContextManager to build description from osprey.context import ContextManager context_manager = ContextManager(self._state) context_description = context_manager.get_context_access_description(step_inputs)

# Or with a custom default step_inputs = self.get_step_inputs(default=[]) ```

get_task_objective(default=None)#

Get the task objective for the current step.

The orchestrator provides task_objective in each step to describe what the capability should accomplish. This is commonly used for logging, search queries, and LLM prompts.

Parameters:

default (str | None) – Default value if task_objective not in step. If None, falls back to current task from state.

Returns:

Task objective string

Raises:

RuntimeError – If called outside execute() (state not injected)

Return type:

str

Example

```python async def execute(self) -> dict[str, Any]:

# Get task objective with automatic fallback task = self.get_task_objective() logger.info(f”Starting: {task}”)

# Or with custom default task = self.get_task_objective(default=”unknown task”)

# Common pattern: use as search query search_query = self.get_task_objective().lower() ```

async langgraph_node(**kwargs)#

LangGraph-native node function with manual retry handling via router.

Return type:

dict[str, Any]

property orchestrator_guide: Any | None#

Get the orchestrator guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.

Returns:

Orchestrator guide for execution planning integration, or None if not needed

Return type:

Optional[OrchestratorGuide]

process_extracted_contexts(contexts)#

Override to customize extracted contexts (e.g., flatten lists).

Parameters:

contexts (dict[str, CapabilityContext | list[CapabilityContext]]) – Dict mapping context type names to extracted objects

Returns:

Processed contexts dict

Return type:

dict[str, CapabilityContext | list[CapabilityContext]]

Example

```python def process_extracted_contexts(self, contexts):

‘’’Flatten list of CHANNEL_ADDRESSES.’’’ channels_raw = contexts[“CHANNEL_ADDRESSES”]

if isinstance(channels_raw, list):

flat = [] for ctx in channels_raw:

flat.extend(ctx.channels)

contexts[“CHANNEL_ADDRESSES”] = flat

else:

contexts[“CHANNEL_ADDRESSES”] = channels_raw.channels

return contexts

```

store_output_context(context_data)#

Store single output context - uses context’s CONTEXT_TYPE attribute.

No need for provides field or state/step parameters!

Parameters:

context_data (CapabilityContext) – Context object with CONTEXT_TYPE class variable

Returns:

State updates dict for LangGraph to merge

Raises:
  • AttributeError – If context_data lacks CONTEXT_TYPE class variable

  • RuntimeError – If called outside execute() (state not injected)

  • ValueError – If context_key missing from step

Return type:

dict[str, Any]

Example

`python return self.store_output_context(ArchiverDataContext(...)) `

store_output_contexts(*context_objects)#

Store multiple output contexts - all self-describing.

Parameters:

*context_objects (CapabilityContext) – Context objects with CONTEXT_TYPE attributes

Returns:

Merged state updates dict for LangGraph

Raises:
  • AttributeError – If any context lacks CONTEXT_TYPE

  • RuntimeError – If called outside execute()

  • ValueError – If context types don’t match provides field

Return type:

dict[str, Any]

Example

```python return self.store_output_contexts(

ArchiverDataContext(…), MetadataContext(…), StatisticsContext(…)

)#

Core Models#

Message generation uses models defined in the core framework:

See also

BaseCapability

Base class for all capabilities

ContextManager

Context management for response generation

Registration#

Respond Capability is automatically registered as:

CapabilityRegistration(
    name="respond",
    module_path="osprey.infrastructure.respond_node",
    class_name="RespondCapability",
    provides=["FINAL_RESPONSE"]
)

Clarify Capability is automatically registered as:

CapabilityRegistration(
    name="clarify",
    module_path="osprey.infrastructure.clarify_node",
    class_name="ClarifyCapability",
    provides=[]
)

See also

Prompt System

Prompt customization system

Registry System

Component registration system

Message Generation

Implementation details and usage patterns