"""
Component Logger Framework
Provides colored logging for Osprey and application components with:
- Unified API for all components (capabilities, infrastructure, pipelines)
- Rich terminal output with component-specific colors
- Graceful fallbacks when configuration is unavailable
- Simple, clear interface
- Optional LangGraph streaming support via lazy initialization
- Typed event emission for structured streaming (OspreyEvent types)
Usage:
# Components with streaming (via BaseCapability.get_logger())
logger = self.get_logger()
logger.status("Creating execution plan...") # Logs + streams typed event
logger.info("Active capabilities: [...]") # Logs only
# Module-level (no streaming)
logger = get_logger("orchestrator")
logger.key_info("Starting orchestration")
logger = get_logger("data_processor")
logger.info("Processing data")
logger.debug("Detailed trace")
logger.success("Operation completed")
logger.warning("Something to note")
logger.error("Something went wrong")
logger.timing("Execution took 2.5 seconds")
logger.approval("Waiting for user approval")
# Custom loggers with explicit parameters
logger = get_logger(name="custom_component", color="blue")
# Emit typed events directly (for infrastructure nodes)
from osprey.events import PhaseStartEvent
logger.emit_event(PhaseStartEvent(phase="task_extraction"))
"""
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
from rich.console import Console
from rich.logging import RichHandler
from osprey.events import ErrorEvent, EventEmitter, OspreyEvent, StatusEvent
from osprey.utils.config import get_config_value
if TYPE_CHECKING:
pass # For future type-only imports
# Hard-coded step mapping for task preparation phases
# (Moved from deprecated streaming.py module)
TASK_PREPARATION_STEPS = {
"task_extraction": {"step": 1, "total_steps": 3, "phase": "Task Preparation"},
"classifier": {"step": 2, "total_steps": 3, "phase": "Task Preparation"},
"orchestrator": {"step": 3, "total_steps": 3, "phase": "Task Preparation"},
}
@contextmanager
def quiet_logging():
"""Context manager to temporarily suppress INFO-level logs.
Temporarily raises the root logger level to WARNING, suppressing
INFO and DEBUG messages while preserving warnings and errors.
Used during direct chat mode for a cleaner conversational experience.
Example:
>>> with quiet_logging():
... # INFO logs are suppressed here
... await process_direct_chat()
# Normal logging restored after context exits
"""
root_logger = logging.getLogger()
original_level = root_logger.level
root_logger.setLevel(logging.WARNING)
try:
yield
finally:
root_logger.setLevel(original_level)
[docs]
class ComponentLogger:
"""
Rich-formatted logger for Osprey and application components with color coding and message hierarchy.
Now includes optional LangGraph streaming support via lazy initialization.
Message Types:
- status: High-level status updates (logs + streams automatically)
- key_info: Important operational information
- info: Normal operational messages
- debug: Detailed tracing information
- warning: Warning messages
- error: Error messages (logs + streams automatically)
- success: Success messages (logs + streams by default)
- timing: Timing information
- approval: Approval messages
- resume: Resume messages
"""
[docs]
def __init__(
self,
base_logger: logging.Logger,
component_name: str,
color: str = "white",
state: Any = None,
):
"""
Initialize component logger.
Args:
base_logger: Underlying Python logger
component_name: Name of the component (e.g., 'data_analysis', 'router', 'mongo')
color: Rich color name for this component
state: Optional AgentState for streaming context
"""
self.base_logger = base_logger
self.component_name = component_name
self.color = color
self._state = state
# Lazy initialization - only when first needed
self._stream_writer = None
self._stream_writer_attempted = False
self._step_info = None
# Typed event emitter for the new event streaming system
self._event_emitter = EventEmitter(component_name)
def _get_stream_writer(self):
"""Lazy initialization of stream writer (only when first needed)."""
if not self._stream_writer_attempted:
self._stream_writer_attempted = True
try:
from langgraph.config import get_stream_writer
self._stream_writer = get_stream_writer()
# Also extract step info when we get the writer
self._step_info = self._extract_step_info(self._state)
except (RuntimeError, ImportError):
# Not in LangGraph context - that's fine
self._stream_writer = None
self._step_info = {}
return self._stream_writer
def _extract_step_info(self, state):
"""Extract step context from AgentState for streaming metadata.
Reuses existing logic from StreamWriter._get_step_info():
- Task preparation phases use hard-coded step mapping
- Execution phases extract from execution plan in state
- Falls back to basic component info
"""
# Check if this is a task preparation component
if self.component_name in TASK_PREPARATION_STEPS:
return TASK_PREPARATION_STEPS[self.component_name]
# For execution phase, extract from state
if state and hasattr(state, "get"):
try:
from osprey.state.state_manager import StateManager
execution_plan = state.get("planning_execution_plan")
if execution_plan and execution_plan.get("steps"):
current_step_index = StateManager.get_current_step_index(state)
total_steps = len(execution_plan.get("steps", []))
if total_steps > 0:
return {
"step": current_step_index + 1,
"total_steps": total_steps,
"phase": "Execution",
}
except Exception:
# Silently fall through to default step info below.
# Cannot log here: runs inside logging infrastructure,
# logging.debug() risks recursion or handler re-entry.
pass
# Default: no step info
return {
"step": None,
"total_steps": None,
"phase": self.component_name.replace("_", " ").title(),
}
def _emit_stream_event(self, message: str, *args, event_type: str = "status", **kwargs):
"""Emit streaming event as typed OspreyEvent.
Uses the EventEmitter to emit typed StatusEvent or ErrorEvent instances.
The emitter handles LangGraph streaming and fallback handlers automatically.
Supports stdlib-style format args: ``logger.info("msg %s", val)``
"""
# Apply %-style formatting if positional args are provided (stdlib compat)
if args:
try:
message = message % args
except (TypeError, ValueError):
pass
# Extract step info for the event (lazy init if needed)
if self._step_info is None:
self._step_info = self._extract_step_info(self._state)
step_info = self._step_info or {}
try:
# Create typed event based on event_type
if event_type == "error" or kwargs.get("error"):
event = ErrorEvent(
component=self.component_name,
error_type=kwargs.get("error_type", "ExecutionError"),
error_message=message,
recoverable=kwargs.get("recoverable", False),
stack_trace=kwargs.get("stack_trace"),
)
else:
# Map event_type to StatusEvent level
level_map = {
"status": "status",
"info": "info",
"debug": "debug",
"warning": "warning",
"success": "success",
"key_info": "key_info",
"timing": "timing",
"approval": "approval",
"resume": "resume",
}
level = level_map.get(event_type, "info")
event = StatusEvent(
component=self.component_name,
message=message,
level=level,
phase=step_info.get("phase"),
step=step_info.get("step"),
total_steps=step_info.get("total_steps"),
)
# Emit via the typed event system
self._event_emitter.emit(event)
except Exception:
# Don't crash logging just because streaming failed
# Avoid recursive debug() call that could cause infinite loop
pass
[docs]
def emit_event(self, event: OspreyEvent) -> None:
"""Emit a typed OspreyEvent directly.
Use this for structured events like PhaseStartEvent, CapabilityStartEvent, etc.
that don't fit the standard logging pattern.
Args:
event: The typed event to emit
Example:
from osprey.events import PhaseStartEvent
logger.emit_event(PhaseStartEvent(
phase="task_extraction",
description="Extracting task from query"
))
"""
# Ensure component is set if not already
if not event.component:
event.component = self.component_name
self._event_emitter.emit(event)
[docs]
def emit_llm_request(
self, prompt: str, key: str = "", model: str = "", provider: str = ""
) -> None:
"""Emit LLMRequestEvent with full prompt for TUI display.
Args:
prompt: The complete LLM prompt text
key: Optional key for accumulating multiple prompts (e.g., capability name)
model: Model identifier (e.g., "gpt-4", "claude-3-opus")
provider: Provider name (e.g., "openai", "anthropic")
"""
from osprey.events import LLMRequestEvent
event = LLMRequestEvent(
component=self.component_name,
prompt_preview=prompt[:200] + "..." if len(prompt) > 200 else prompt,
prompt_length=len(prompt),
model=model,
provider=provider,
full_prompt=prompt,
key=key,
)
self._event_emitter.emit(event)
[docs]
def emit_llm_response(
self,
response: str,
key: str = "",
duration_ms: int = 0,
input_tokens: int = 0,
output_tokens: int = 0,
) -> None:
"""Emit LLMResponseEvent with full response for TUI display.
Args:
response: The complete LLM response text
key: Optional key for accumulating multiple responses (e.g., capability name)
duration_ms: How long the request took in milliseconds
input_tokens: Number of input tokens
output_tokens: Number of output tokens
"""
from osprey.events import LLMResponseEvent
event = LLMResponseEvent(
component=self.component_name,
response_preview=response[:200] + "..." if len(response) > 200 else response,
response_length=len(response),
duration_ms=duration_ms,
input_tokens=input_tokens,
output_tokens=output_tokens,
full_response=response,
key=key,
)
self._event_emitter.emit(event)
def _format_message(self, message: str, style: str, emoji: str = "") -> str:
"""Format message with Rich markup and emoji prefix."""
try:
prefix = f"{emoji}{self.component_name.title()}: "
if style:
return f"[{style}]{prefix}{message}[/{style}]"
else:
return f"{prefix}{message}"
except Exception:
# Graceful degradation for environments where Rich markup fails
return f"{emoji}{self.component_name.title()}: {message}"
def _build_extra(self, message: str, log_type: str, **kwargs) -> dict:
"""Build extra dict with raw message, log type, and all streaming data.
This embeds all the data that streaming events carry into the Python log,
enabling TUI to get all data from a single source (Python logging).
Args:
message: The raw message (without Rich markup)
log_type: The log type (status, success, error, info, etc.)
**kwargs: Additional data fields (task, capabilities, steps, etc.)
Returns:
Dict to pass as extra= parameter to base_logger
"""
extra = {
"raw_message": message,
"log_type": log_type,
}
# Include all streaming data fields (for TUI to extract)
for key in [
"task",
"capabilities",
"capability_names",
"steps",
"phase",
"step_num",
"step_name",
"llm_prompt",
"llm_response",
]:
if key in kwargs:
extra[key] = kwargs[key]
# Also include step info from state if available
if self._step_info:
if "phase" not in extra and "phase" in self._step_info:
extra["phase"] = self._step_info.get("phase", "")
return extra
[docs]
def status(self, message: str, *args, **kwargs) -> None:
"""Status update - emits StatusEvent.
User-facing output. Transport is automatic:
- During graph.astream(): LangGraph streaming
- Outside graph execution: fallback transport → TypedEventHandler
Args:
message: Status message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
Example:
logger.status("Creating execution plan...")
logger.status("Processing batch 2/5", batch=2, total=5)
"""
self._emit_stream_event(message, *args, event_type="status", **kwargs)
[docs]
def key_info(self, message: str, *args, **kwargs) -> None:
"""Important operational information - emits StatusEvent with info level.
User-facing output. Transport is automatic.
Args:
message: Info message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="key_info", **kwargs)
[docs]
def info(self, message: str, *args, **kwargs) -> None:
"""Info message - emits StatusEvent with info level.
User-facing output. Transport is automatic.
Args:
message: Info message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
Example:
logger.info("Active capabilities: [...]")
logger.info("Step completed")
"""
self._emit_stream_event(message, *args, event_type="info", **kwargs)
[docs]
def debug(self, message: str, *args, **kwargs) -> None:
"""Debug message - emits StatusEvent with debug level.
User-facing output (filtered by client if not needed).
Transport is automatic.
Args:
message: Debug message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="debug", **kwargs)
[docs]
def warning(self, message: str, *args, **kwargs) -> None:
"""Warning message - emits StatusEvent with warning level.
User-facing output. Transport is automatic.
Args:
message: Warning message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="warning", warning=True, **kwargs)
[docs]
def error(self, message: str, *args, exc_info: bool = False, **kwargs) -> None:
"""Error message - emits ErrorEvent.
User-facing output. Transport is automatic.
Args:
message: Error message
*args: Optional %-style format args (stdlib compat)
exc_info: Whether to include exception traceback in ErrorEvent
**kwargs: Additional error metadata for streaming event
"""
# Include stack trace in ErrorEvent if exc_info=True
if exc_info and "stack_trace" not in kwargs:
import traceback
kwargs["stack_trace"] = traceback.format_exc()
self._emit_stream_event(message, *args, event_type="error", error=True, **kwargs)
[docs]
def success(self, message: str, *args, **kwargs) -> None:
"""Success message - emits StatusEvent with success level.
User-facing output. Transport is automatic.
Args:
message: Success message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="success", **kwargs)
[docs]
def timing(self, message: str, *args, **kwargs) -> None:
"""Timing information - emits StatusEvent with timing level.
User-facing output. Transport is automatic.
Args:
message: Timing message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="timing", **kwargs)
[docs]
def approval(self, message: str, *args, **kwargs) -> None:
"""Approval message - emits StatusEvent with approval level.
User-facing output. Transport is automatic.
Args:
message: Approval message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="approval", **kwargs)
[docs]
def resume(self, message: str, *args, **kwargs) -> None:
"""Resume message - emits StatusEvent with resume level.
User-facing output. Transport is automatic.
Args:
message: Resume message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional metadata for streaming event
"""
self._emit_stream_event(message, *args, event_type="resume", **kwargs)
[docs]
def critical(self, message: str, *args, **kwargs) -> None:
"""Critical error - emits ErrorEvent.
User-facing output. Transport is automatic.
Args:
message: Critical error message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional error metadata for streaming event
"""
self._emit_stream_event(
message, *args, event_type="error", error=True, error_type="CriticalError", **kwargs
)
[docs]
def exception(self, message: str, *args, **kwargs) -> None:
"""Exception with traceback - emits ErrorEvent with stack trace.
User-facing output. Transport is automatic.
Args:
message: Exception message
*args: Optional %-style format args (stdlib compat)
**kwargs: Additional error metadata for streaming event
"""
import traceback
if "stack_trace" not in kwargs:
kwargs["stack_trace"] = traceback.format_exc()
self._emit_stream_event(message, *args, event_type="error", error=True, **kwargs)
# Properties for compatibility
@property
def level(self) -> int:
return self.base_logger.level
@property
def name(self) -> str:
return self.base_logger.name
[docs]
def setLevel(self, level: int) -> None:
self.base_logger.setLevel(level)
[docs]
def isEnabledFor(self, level: int) -> bool:
return self.base_logger.isEnabledFor(level)
def _setup_rich_logging(level: int = logging.INFO) -> None:
"""Configure Rich logging for the root logger (called once)."""
root_logger = logging.getLogger()
# Prevent duplicate handler registration for consistent logging behavior
for handler in root_logger.handlers:
if isinstance(handler, RichHandler):
return
# Ensure clean handler state to prevent duplicate log messages
if root_logger.hasHandlers():
root_logger.handlers.clear()
root_logger.setLevel(level)
# Load user-configurable display preferences from config
try:
# Security-conscious defaults: hide locals to prevent sensitive data exposure
rich_tracebacks = get_config_value("logging.rich_tracebacks", True)
show_traceback_locals = get_config_value("logging.show_traceback_locals", False)
show_full_paths = get_config_value("logging.show_full_paths", False)
except Exception:
# Config system unavailable; use secure defaults.
# Cannot log here: logging infrastructure is mid-configuration
# (handlers cleared but RichHandler not yet installed).
rich_tracebacks = True
show_traceback_locals = False
show_full_paths = False
# Optimize console for containerized and CI/CD environments
console = Console(
force_terminal=True, # Ensure color output in Docker containers and CI systems
width=120, # Prevent line wrapping in standard terminal sizes
color_system="truecolor", # Enable full color spectrum for component identification
)
handler = RichHandler(
console=console, # Use our custom console
rich_tracebacks=rich_tracebacks, # Configurable rich tracebacks
markup=True, # Enable [bold], [green], etc. in log messages
show_path=show_full_paths, # Configurable path display
show_time=True, # Show timestamp
show_level=True, # Show log level
tracebacks_show_locals=show_traceback_locals, # Configurable local variables
)
root_logger.addHandler(handler)
# Reduce third-party library noise to focus on application-specific issues
for lib in ["httpx", "httpcore", "requests", "urllib3", "LiteLLM"]:
logging.getLogger(lib).setLevel(logging.WARNING)
[docs]
def get_logger(
component_name: str = None,
level: int = logging.INFO,
*,
state: Any = None,
name: str = None,
color: str = None,
# Deprecated parameters - kept for backward compatibility
source: str = None,
) -> ComponentLogger:
"""
Get a unified logger that handles both CLI logging and LangGraph streaming.
Primary API (recommended - use via BaseCapability.get_logger()):
component_name: Component name (e.g., 'orchestrator', 'data_analysis')
state: Optional AgentState for streaming context and step tracking
level: Logging level
Explicit API (for custom loggers or module-level usage):
name: Direct logger name (keyword-only)
color: Direct color specification (keyword-only)
level: Logging level
Returns:
ComponentLogger instance that logs to CLI and optionally streams
Examples:
# Recommended: Use via BaseCapability
class MyCapability(BaseCapability):
async def execute(self):
logger = self.get_logger() # Auto-streams!
logger.status("Working...")
# Module-level (no streaming)
logger = get_logger("orchestrator")
logger.info("Planning started")
# With streaming (when you have state)
logger = get_logger("orchestrator", state=state)
logger.status("Creating execution plan...") # Logs + streams
logger.info("Active capabilities: [...]") # Logs only
logger.error("Failed!") # Logs + streams
# Custom logger
logger = get_logger(name="test_logger", color="blue")
.. deprecated::
The two-parameter API get_logger(source, component_name) is deprecated.
Use get_logger(component_name) instead. The flat configuration structure
(logging.logging_colors.{component_name}) replaces the old nested structure.
"""
import warnings
# Initialize logging infrastructure with Rich formatting support
_setup_rich_logging(level)
# Handle explicit API for custom logger creation (tests, utilities)
if name is not None:
# Direct logger creation bypasses convention-based color assignment
base_logger = logging.getLogger(name)
actual_color = color or "white"
return ComponentLogger(base_logger, name, actual_color, state=state)
# Handle deprecated two-parameter API: get_logger("framework", "component")
# This maintains backward compatibility while warning users to migrate
if source is not None:
warnings.warn(
f"The two-parameter API get_logger('{source}', '{component_name}') is deprecated. "
f"Use get_logger('{component_name}') instead. The 'source' parameter is no longer needed "
f"as the configuration uses a flat structure: logging.logging_colors.{component_name}",
DeprecationWarning,
stacklevel=2,
)
# For backward compatibility, still accept the old format but ignore source
# component_name is already set from the second positional argument
# Validate that component_name is provided
if component_name is None:
raise ValueError(
"Component name is required. Usage: get_logger('component_name') or "
"get_logger(name='custom_name', color='blue')"
)
# Use component name as logger identifier for hierarchical organization
base_logger = logging.getLogger(component_name)
# Retrieve component-specific color from flat configuration structure
try:
# New flat structure: logging.logging_colors.{component_name}
config_path = f"logging.logging_colors.{component_name}"
color = get_config_value(config_path)
if not color:
color = "white"
except Exception as e:
# Graceful degradation ensures logging continues even with config issues
color = "white"
# Only show warning in debug mode to reduce noise
import os
if os.getenv("DEBUG_LOGGING"):
print(
f"⚠️ WARNING: Failed to load color config for {component_name}: {e}. Using white as fallback."
)
# Pass state to enable streaming
return ComponentLogger(base_logger, component_name, color, state=state)