Framework Utilities#
Supporting systems for advanced usage and development tooling.
LLM Completion Interface#
Multi-provider LLM completions via LiteLLM for structured generation and direct completions.
- osprey.models.get_chat_completion(message, max_tokens=1024, model_config=None, provider=None, model_id=None, budget_tokens=None, enable_thinking=False, output_model=None, base_url=None, provider_config=None, temperature=0.0)[source]#
Execute direct chat completion requests across multiple AI providers via LiteLLM.
This function provides immediate access to LLM model inference with support for advanced features including extended thinking, structured outputs, and automatic TypedDict conversion.
- Parameters:
message (str) – Input prompt or message for the LLM model
max_tokens (int) – Maximum tokens to generate in the response
model_config (dict | None) – Configuration dictionary with provider and model settings
provider (str | None) – AI provider name (‘anthropic’, ‘google’, ‘openai’, ‘ollama’, ‘cborg’, etc.)
model_id (str | None) – Specific model identifier recognized by the provider
budget_tokens (int | None) – Thinking budget for Anthropic/Google extended reasoning
enable_thinking (bool) – Enable extended thinking capabilities where supported
output_model (type[BaseModel] | None) – Pydantic model or TypedDict for structured output validation
base_url (str | None) – Custom API endpoint, required for Ollama and CBORG providers
provider_config (dict | None) – Optional provider configuration dict with api_key, base_url, etc.
temperature (float) – Sampling temperature (0.0-2.0)
- Raises:
ValueError – If required provider, model_id, api_key, or base_url are missing
- Returns:
Model response (str, Pydantic model, or list of content blocks for thinking)
- Return type:
str | BaseModel | list
Examples
Simple text completion:
>>> from osprey.models import get_chat_completion >>> response = get_chat_completion( ... message="Explain quantum computing", ... provider="anthropic", ... model_id="claude-sonnet-4", ... )
Structured output:
>>> from pydantic import BaseModel >>> class Result(BaseModel): ... summary: str ... confidence: float >>> >>> result = get_chat_completion( ... message="Analyze this data", ... provider="openai", ... model_id="gpt-4o", ... output_model=Result ... )
Developer Tools#
Unified logging system with automatic LangGraph streaming support for framework development.
Logging and Streaming#
The framework provides a unified logging API that automatically handles both CLI output
and web UI streaming. Use logger.status() for high-level updates that should appear
in both interfaces, and standard logging methods (info(), debug()) for detailed
CLI-only output.
Recommended Pattern:
# In capabilities - automatic streaming
logger = self.get_logger()
logger.status("Creating execution plan...") # Logs + streams
logger.info("Active capabilities: [...]") # Logs only
# In other nodes with state
logger = get_logger("orchestrator", state=state)
logger.status("Processing...") # Logs + streams
- osprey.utils.logger.get_logger(component_name=None, level=logging.INFO, *, state=None, name=None, color=None, source=None)[source]#
Get a unified logger that handles both CLI logging and LangGraph streaming.
- Primary API (recommended - use via BaseCapability.get_logger()):
component_name: Component name (e.g., ‘orchestrator’, ‘data_analysis’) state: Optional AgentState for streaming context and step tracking level: Logging level
- Explicit API (for custom loggers or module-level usage):
name: Direct logger name (keyword-only) color: Direct color specification (keyword-only) level: Logging level
- Returns:
ComponentLogger instance that logs to CLI and optionally streams
- Return type:
Examples
# Recommended: Use via BaseCapability class MyCapability(BaseCapability):
- async def execute(self):
logger = self.get_logger() # Auto-streams! logger.status(“Working…”)
# Module-level (no streaming) logger = get_logger(“orchestrator”) logger.info(“Planning started”)
# With streaming (when you have state) logger = get_logger(“orchestrator”, state=state) logger.status(“Creating execution plan…”) # Logs + streams logger.info(“Active capabilities: […]”) # Logs only logger.error(“Failed!”) # Logs + streams
# Custom logger logger = get_logger(name=”test_logger”, color=”blue”)
Deprecated since version The: two-parameter API get_logger(source, component_name) is deprecated. Use get_logger(component_name) instead. The flat configuration structure (logging.logging_colors.{component_name}) replaces the old nested structure.
- class osprey.utils.logger.ComponentLogger(base_logger, component_name, color='white', state=None)[source]#
Bases:
objectRich-formatted logger for Osprey and application components with color coding and message hierarchy.
Now includes optional LangGraph streaming support via lazy initialization.
Message Types: - status: High-level status updates (logs + streams automatically) - key_info: Important operational information - info: Normal operational messages - debug: Detailed tracing information - warning: Warning messages - error: Error messages (logs + streams automatically) - success: Success messages (logs + streams by default) - timing: Timing information - approval: Approval messages - resume: Resume messages
Initialize component logger.
- Parameters:
base_logger (Logger) – Underlying Python logger
component_name (str) – Name of the component (e.g., ‘data_analysis’, ‘router’, ‘mongo’)
color (str) – Rich color name for this component
state (Any) – Optional AgentState for streaming context
- __init__(base_logger, component_name, color='white', state=None)[source]#
Initialize component logger.
- Parameters:
base_logger (Logger) – Underlying Python logger
component_name (str) – Name of the component (e.g., ‘data_analysis’, ‘router’, ‘mongo’)
color (str) – Rich color name for this component
state (Any) – Optional AgentState for streaming context
- status(message, **kwargs)[source]#
Status update - logs and streams automatically.
Use for high-level progress updates that users should see in both CLI and web interfaces.
- Parameters:
message (str) – Status message
**kwargs – Additional metadata for streaming event
Example
logger.status(“Creating execution plan…”) logger.status(“Processing batch 2/5”, batch=2, total=5)
- key_info(message, stream=False, **kwargs)[source]#
Important operational information - logs and optionally streams.
- Parameters:
message (str) – Info message
stream (bool) – Whether to also stream this message
**kwargs – Additional metadata for streaming event
- info(message, stream=False, **kwargs)[source]#
Info message - logs always, streams optionally.
By default, info messages only go to CLI logs. Use stream=True to also send to web interface.
- Parameters:
message (str) – Info message
stream (bool) – Whether to also stream this message
**kwargs – Additional metadata for streaming event
Example
logger.info(“Active capabilities: […]”) # CLI only logger.info(“Step completed”, stream=True) # CLI + stream
- debug(message, stream=False, **kwargs)[source]#
Debug message - logs only (never streams by default).
Debug messages are detailed technical info not meant for web UI.
- Parameters:
message (str) – Debug message
stream (bool) – Whether to stream (default: False)
**kwargs – Additional metadata for streaming event
- warning(message, stream=True, **kwargs)[source]#
Warning message - logs and optionally streams.
Warnings stream by default since they’re important for users to see.
- Parameters:
message (str) – Warning message
stream (bool) – Whether to stream (default: True)
**kwargs – Additional metadata for streaming event
- error(message, exc_info=False, **kwargs)[source]#
Error message - always logs and streams.
Errors are important and should always be visible in both interfaces.
- Parameters:
message (str) – Error message
exc_info (bool) – Whether to include exception traceback
**kwargs – Additional error metadata for streaming event
- success(message, stream=True, **kwargs)[source]#
Success message - logs and optionally streams.
Success messages stream by default to give users feedback.
- Parameters:
message (str) – Success message
stream (bool) – Whether to stream (default: True)
**kwargs – Additional metadata for streaming event
- timing(message, stream=False, **kwargs)[source]#
Timing information - logs and optionally streams.
- Parameters:
message (str) – Timing message
stream (bool) – Whether to stream (default: False)
**kwargs – Additional metadata for streaming event
- approval(message, stream=True, **kwargs)[source]#
Approval messages - logs and optionally streams.
Approval requests stream by default so users see them in web UI.
- Parameters:
message (str) – Approval message
stream (bool) – Whether to stream (default: True)
**kwargs – Additional metadata for streaming event
- resume(message, stream=True, **kwargs)[source]#
Resume messages - logs and optionally streams.
Resume messages stream by default to provide feedback.
- Parameters:
message (str) – Resume message
stream (bool) – Whether to stream (default: True)
**kwargs – Additional metadata for streaming event
- property level: int#
- property name: str#
Legacy Streaming API (Deprecated)#
Deprecated since version 0.9.2: The separate streaming API is deprecated in favor of the unified logging system.
Use osprey.base.capability.BaseCapability.get_logger() in capabilities or
get_logger() with state parameter for automatic streaming support.
For backward compatibility only. New code should use the unified logging system above.
- osprey.utils.streaming.get_streamer(component, state=None, *, source=None)[source]#
Get a stream writer for consistent streaming events.
Deprecated since version 0.9.2: Use the unified logging system instead:
self.get_logger()in capabilities orget_logger(component, state=state)for automatic streaming support.This function is maintained for backward compatibility but the unified logging system provides better integration with both CLI and web UI streaming through a single API.
Migration Guide:
# Old pattern (deprecated) streamer = get_streamer("orchestrator", state) streamer.status("Creating execution plan...") # New pattern (recommended) logger = self.get_logger() # In capabilities logger.status("Creating execution plan...") # Logs + streams automatically
- Parameters:
component (str) – Component name (e.g., “orchestrator”, “python_executor”)
state (Any | None) – Optional AgentState for extracting execution context
source (str) – (Deprecated) Source type - no longer needed with flat config structure
- Returns:
StreamWriter instance that handles event emission automatically
- Return type:
See also
osprey.utils.logger.ComponentLogger: Unified logging with streamingosprey.base.capability.BaseCapability.get_logger(): Recommended API
- class osprey.utils.streaming.StreamWriter(component, state=None, *, source=None)[source]#
Bases:
objectStream writer that provides consistent streaming events with automatic step counting.
Eliminates the need for manual if writer: checks and provides step context for task preparation phases.
Initialize stream writer with component context.
- Parameters:
component (str) – Component name (e.g., “orchestrator”, “python_executor”)
state (Any | None) – Optional AgentState for extracting execution context
source (str) – (Deprecated) Source type - no longer needed with flat config structure
- __init__(component, state=None, *, source=None)[source]#
Initialize stream writer with component context.
- Parameters:
component (str) – Component name (e.g., “orchestrator”, “python_executor”)
state (Any | None) – Optional AgentState for extracting execution context
source (str) – (Deprecated) Source type - no longer needed with flat config structure
See also
- Orchestrator-First Architecture: Upfront Planning in Practice
Development utilities integration patterns and configuration conventions