Source code for osprey.utils.logger

"""
Component Logger Framework

Provides colored logging for Osprey and application components with:
- Unified API for all components (capabilities, infrastructure, pipelines)
- Rich terminal output with component-specific colors
- Graceful fallbacks when configuration is unavailable
- Simple, clear interface
- Optional LangGraph streaming support via lazy initialization
- Typed event emission for structured streaming (OspreyEvent types)

Usage:
    # Components with streaming (via BaseCapability.get_logger())
    logger = self.get_logger()
    logger.status("Creating execution plan...")  # Logs + streams typed event
    logger.info("Active capabilities: [...]")   # Logs only

    # Module-level (no streaming)
    logger = get_logger("orchestrator")
    logger.key_info("Starting orchestration")

    logger = get_logger("data_processor")
    logger.info("Processing data")
    logger.debug("Detailed trace")
    logger.success("Operation completed")
    logger.warning("Something to note")
    logger.error("Something went wrong")
    logger.timing("Execution took 2.5 seconds")
    logger.approval("Waiting for user approval")

    # Custom loggers with explicit parameters
    logger = get_logger(name="custom_component", color="blue")

    # Emit typed events directly (for infrastructure nodes)
    from osprey.events import PhaseStartEvent
    logger.emit_event(PhaseStartEvent(phase="task_extraction"))
"""

import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any

from rich.console import Console
from rich.logging import RichHandler

from osprey.events import ErrorEvent, EventEmitter, OspreyEvent, StatusEvent
from osprey.utils.config import get_config_value

if TYPE_CHECKING:
    pass  # For future type-only imports


# Hard-coded step mapping for task preparation phases
# (Moved from deprecated streaming.py module)
TASK_PREPARATION_STEPS = {
    "task_extraction": {"step": 1, "total_steps": 3, "phase": "Task Preparation"},
    "classifier": {"step": 2, "total_steps": 3, "phase": "Task Preparation"},
    "orchestrator": {"step": 3, "total_steps": 3, "phase": "Task Preparation"},
}


@contextmanager
def quiet_logging():
    """Context manager to temporarily suppress INFO-level logs.

    Temporarily raises the root logger level to WARNING, suppressing
    INFO and DEBUG messages while preserving warnings and errors.
    Used during direct chat mode for a cleaner conversational experience.

    Example:
        >>> with quiet_logging():
        ...     # INFO logs are suppressed here
        ...     await process_direct_chat()
        # Normal logging restored after context exits
    """
    root_logger = logging.getLogger()
    original_level = root_logger.level
    root_logger.setLevel(logging.WARNING)
    try:
        yield
    finally:
        root_logger.setLevel(original_level)


[docs] class ComponentLogger: """ Rich-formatted logger for Osprey and application components with color coding and message hierarchy. Now includes optional LangGraph streaming support via lazy initialization. Message Types: - status: High-level status updates (logs + streams automatically) - key_info: Important operational information - info: Normal operational messages - debug: Detailed tracing information - warning: Warning messages - error: Error messages (logs + streams automatically) - success: Success messages (logs + streams by default) - timing: Timing information - approval: Approval messages - resume: Resume messages """
[docs] def __init__( self, base_logger: logging.Logger, component_name: str, color: str = "white", state: Any = None, ): """ Initialize component logger. Args: base_logger: Underlying Python logger component_name: Name of the component (e.g., 'data_analysis', 'router', 'mongo') color: Rich color name for this component state: Optional AgentState for streaming context """ self.base_logger = base_logger self.component_name = component_name self.color = color self._state = state # Lazy initialization - only when first needed self._stream_writer = None self._stream_writer_attempted = False self._step_info = None # Typed event emitter for the new event streaming system self._event_emitter = EventEmitter(component_name)
def _get_stream_writer(self): """Lazy initialization of stream writer (only when first needed).""" if not self._stream_writer_attempted: self._stream_writer_attempted = True try: from langgraph.config import get_stream_writer self._stream_writer = get_stream_writer() # Also extract step info when we get the writer self._step_info = self._extract_step_info(self._state) except (RuntimeError, ImportError): # Not in LangGraph context - that's fine self._stream_writer = None self._step_info = {} return self._stream_writer def _extract_step_info(self, state): """Extract step context from AgentState for streaming metadata. Reuses existing logic from StreamWriter._get_step_info(): - Task preparation phases use hard-coded step mapping - Execution phases extract from execution plan in state - Falls back to basic component info """ # Check if this is a task preparation component if self.component_name in TASK_PREPARATION_STEPS: return TASK_PREPARATION_STEPS[self.component_name] # For execution phase, extract from state if state and hasattr(state, "get"): try: from osprey.state.state_manager import StateManager execution_plan = state.get("planning_execution_plan") if execution_plan and execution_plan.get("steps"): current_step_index = StateManager.get_current_step_index(state) total_steps = len(execution_plan.get("steps", [])) if total_steps > 0: return { "step": current_step_index + 1, "total_steps": total_steps, "phase": "Execution", } except Exception: # Silently fall through to default step info below. # Cannot log here: runs inside logging infrastructure, # logging.debug() risks recursion or handler re-entry. pass # Default: no step info return { "step": None, "total_steps": None, "phase": self.component_name.replace("_", " ").title(), } def _emit_stream_event(self, message: str, *args, event_type: str = "status", **kwargs): """Emit streaming event as typed OspreyEvent. Uses the EventEmitter to emit typed StatusEvent or ErrorEvent instances. The emitter handles LangGraph streaming and fallback handlers automatically. Supports stdlib-style format args: ``logger.info("msg %s", val)`` """ # Apply %-style formatting if positional args are provided (stdlib compat) if args: try: message = message % args except (TypeError, ValueError): pass # Extract step info for the event (lazy init if needed) if self._step_info is None: self._step_info = self._extract_step_info(self._state) step_info = self._step_info or {} try: # Create typed event based on event_type if event_type == "error" or kwargs.get("error"): event = ErrorEvent( component=self.component_name, error_type=kwargs.get("error_type", "ExecutionError"), error_message=message, recoverable=kwargs.get("recoverable", False), stack_trace=kwargs.get("stack_trace"), ) else: # Map event_type to StatusEvent level level_map = { "status": "status", "info": "info", "debug": "debug", "warning": "warning", "success": "success", "key_info": "key_info", "timing": "timing", "approval": "approval", "resume": "resume", } level = level_map.get(event_type, "info") event = StatusEvent( component=self.component_name, message=message, level=level, phase=step_info.get("phase"), step=step_info.get("step"), total_steps=step_info.get("total_steps"), ) # Emit via the typed event system self._event_emitter.emit(event) except Exception: # Don't crash logging just because streaming failed # Avoid recursive debug() call that could cause infinite loop pass
[docs] def emit_event(self, event: OspreyEvent) -> None: """Emit a typed OspreyEvent directly. Use this for structured events like PhaseStartEvent, CapabilityStartEvent, etc. that don't fit the standard logging pattern. Args: event: The typed event to emit Example: from osprey.events import PhaseStartEvent logger.emit_event(PhaseStartEvent( phase="task_extraction", description="Extracting task from query" )) """ # Ensure component is set if not already if not event.component: event.component = self.component_name self._event_emitter.emit(event)
[docs] def emit_llm_request( self, prompt: str, key: str = "", model: str = "", provider: str = "" ) -> None: """Emit LLMRequestEvent with full prompt for TUI display. Args: prompt: The complete LLM prompt text key: Optional key for accumulating multiple prompts (e.g., capability name) model: Model identifier (e.g., "gpt-4", "claude-3-opus") provider: Provider name (e.g., "openai", "anthropic") """ from osprey.events import LLMRequestEvent event = LLMRequestEvent( component=self.component_name, prompt_preview=prompt[:200] + "..." if len(prompt) > 200 else prompt, prompt_length=len(prompt), model=model, provider=provider, full_prompt=prompt, key=key, ) self._event_emitter.emit(event)
[docs] def emit_llm_response( self, response: str, key: str = "", duration_ms: int = 0, input_tokens: int = 0, output_tokens: int = 0, ) -> None: """Emit LLMResponseEvent with full response for TUI display. Args: response: The complete LLM response text key: Optional key for accumulating multiple responses (e.g., capability name) duration_ms: How long the request took in milliseconds input_tokens: Number of input tokens output_tokens: Number of output tokens """ from osprey.events import LLMResponseEvent event = LLMResponseEvent( component=self.component_name, response_preview=response[:200] + "..." if len(response) > 200 else response, response_length=len(response), duration_ms=duration_ms, input_tokens=input_tokens, output_tokens=output_tokens, full_response=response, key=key, ) self._event_emitter.emit(event)
def _format_message(self, message: str, style: str, emoji: str = "") -> str: """Format message with Rich markup and emoji prefix.""" try: prefix = f"{emoji}{self.component_name.title()}: " if style: return f"[{style}]{prefix}{message}[/{style}]" else: return f"{prefix}{message}" except Exception: # Graceful degradation for environments where Rich markup fails return f"{emoji}{self.component_name.title()}: {message}" def _build_extra(self, message: str, log_type: str, **kwargs) -> dict: """Build extra dict with raw message, log type, and all streaming data. This embeds all the data that streaming events carry into the Python log, enabling TUI to get all data from a single source (Python logging). Args: message: The raw message (without Rich markup) log_type: The log type (status, success, error, info, etc.) **kwargs: Additional data fields (task, capabilities, steps, etc.) Returns: Dict to pass as extra= parameter to base_logger """ extra = { "raw_message": message, "log_type": log_type, } # Include all streaming data fields (for TUI to extract) for key in [ "task", "capabilities", "capability_names", "steps", "phase", "step_num", "step_name", "llm_prompt", "llm_response", ]: if key in kwargs: extra[key] = kwargs[key] # Also include step info from state if available if self._step_info: if "phase" not in extra and "phase" in self._step_info: extra["phase"] = self._step_info.get("phase", "") return extra
[docs] def status(self, message: str, *args, **kwargs) -> None: """Status update - emits StatusEvent. User-facing output. Transport is automatic: - During graph.astream(): LangGraph streaming - Outside graph execution: fallback transport → TypedEventHandler Args: message: Status message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event Example: logger.status("Creating execution plan...") logger.status("Processing batch 2/5", batch=2, total=5) """ self._emit_stream_event(message, *args, event_type="status", **kwargs)
[docs] def key_info(self, message: str, *args, **kwargs) -> None: """Important operational information - emits StatusEvent with info level. User-facing output. Transport is automatic. Args: message: Info message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="key_info", **kwargs)
[docs] def info(self, message: str, *args, **kwargs) -> None: """Info message - emits StatusEvent with info level. User-facing output. Transport is automatic. Args: message: Info message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event Example: logger.info("Active capabilities: [...]") logger.info("Step completed") """ self._emit_stream_event(message, *args, event_type="info", **kwargs)
[docs] def debug(self, message: str, *args, **kwargs) -> None: """Debug message - emits StatusEvent with debug level. User-facing output (filtered by client if not needed). Transport is automatic. Args: message: Debug message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="debug", **kwargs)
[docs] def warning(self, message: str, *args, **kwargs) -> None: """Warning message - emits StatusEvent with warning level. User-facing output. Transport is automatic. Args: message: Warning message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="warning", warning=True, **kwargs)
[docs] def error(self, message: str, *args, exc_info: bool = False, **kwargs) -> None: """Error message - emits ErrorEvent. User-facing output. Transport is automatic. Args: message: Error message *args: Optional %-style format args (stdlib compat) exc_info: Whether to include exception traceback in ErrorEvent **kwargs: Additional error metadata for streaming event """ # Include stack trace in ErrorEvent if exc_info=True if exc_info and "stack_trace" not in kwargs: import traceback kwargs["stack_trace"] = traceback.format_exc() self._emit_stream_event(message, *args, event_type="error", error=True, **kwargs)
[docs] def success(self, message: str, *args, **kwargs) -> None: """Success message - emits StatusEvent with success level. User-facing output. Transport is automatic. Args: message: Success message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="success", **kwargs)
[docs] def timing(self, message: str, *args, **kwargs) -> None: """Timing information - emits StatusEvent with timing level. User-facing output. Transport is automatic. Args: message: Timing message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="timing", **kwargs)
[docs] def approval(self, message: str, *args, **kwargs) -> None: """Approval message - emits StatusEvent with approval level. User-facing output. Transport is automatic. Args: message: Approval message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="approval", **kwargs)
[docs] def resume(self, message: str, *args, **kwargs) -> None: """Resume message - emits StatusEvent with resume level. User-facing output. Transport is automatic. Args: message: Resume message *args: Optional %-style format args (stdlib compat) **kwargs: Additional metadata for streaming event """ self._emit_stream_event(message, *args, event_type="resume", **kwargs)
[docs] def critical(self, message: str, *args, **kwargs) -> None: """Critical error - emits ErrorEvent. User-facing output. Transport is automatic. Args: message: Critical error message *args: Optional %-style format args (stdlib compat) **kwargs: Additional error metadata for streaming event """ self._emit_stream_event( message, *args, event_type="error", error=True, error_type="CriticalError", **kwargs )
[docs] def exception(self, message: str, *args, **kwargs) -> None: """Exception with traceback - emits ErrorEvent with stack trace. User-facing output. Transport is automatic. Args: message: Exception message *args: Optional %-style format args (stdlib compat) **kwargs: Additional error metadata for streaming event """ import traceback if "stack_trace" not in kwargs: kwargs["stack_trace"] = traceback.format_exc() self._emit_stream_event(message, *args, event_type="error", error=True, **kwargs)
# Properties for compatibility @property def level(self) -> int: return self.base_logger.level @property def name(self) -> str: return self.base_logger.name
[docs] def setLevel(self, level: int) -> None: self.base_logger.setLevel(level)
[docs] def isEnabledFor(self, level: int) -> bool: return self.base_logger.isEnabledFor(level)
def _setup_rich_logging(level: int = logging.INFO) -> None: """Configure Rich logging for the root logger (called once).""" root_logger = logging.getLogger() # Prevent duplicate handler registration for consistent logging behavior for handler in root_logger.handlers: if isinstance(handler, RichHandler): return # Ensure clean handler state to prevent duplicate log messages if root_logger.hasHandlers(): root_logger.handlers.clear() root_logger.setLevel(level) # Load user-configurable display preferences from config try: # Security-conscious defaults: hide locals to prevent sensitive data exposure rich_tracebacks = get_config_value("logging.rich_tracebacks", True) show_traceback_locals = get_config_value("logging.show_traceback_locals", False) show_full_paths = get_config_value("logging.show_full_paths", False) except Exception: # Config system unavailable; use secure defaults. # Cannot log here: logging infrastructure is mid-configuration # (handlers cleared but RichHandler not yet installed). rich_tracebacks = True show_traceback_locals = False show_full_paths = False # Optimize console for containerized and CI/CD environments console = Console( force_terminal=True, # Ensure color output in Docker containers and CI systems width=120, # Prevent line wrapping in standard terminal sizes color_system="truecolor", # Enable full color spectrum for component identification ) handler = RichHandler( console=console, # Use our custom console rich_tracebacks=rich_tracebacks, # Configurable rich tracebacks markup=True, # Enable [bold], [green], etc. in log messages show_path=show_full_paths, # Configurable path display show_time=True, # Show timestamp show_level=True, # Show log level tracebacks_show_locals=show_traceback_locals, # Configurable local variables ) root_logger.addHandler(handler) # Reduce third-party library noise to focus on application-specific issues for lib in ["httpx", "httpcore", "requests", "urllib3", "LiteLLM"]: logging.getLogger(lib).setLevel(logging.WARNING)
[docs] def get_logger( component_name: str = None, level: int = logging.INFO, *, state: Any = None, name: str = None, color: str = None, # Deprecated parameters - kept for backward compatibility source: str = None, ) -> ComponentLogger: """ Get a unified logger that handles both CLI logging and LangGraph streaming. Primary API (recommended - use via BaseCapability.get_logger()): component_name: Component name (e.g., 'orchestrator', 'data_analysis') state: Optional AgentState for streaming context and step tracking level: Logging level Explicit API (for custom loggers or module-level usage): name: Direct logger name (keyword-only) color: Direct color specification (keyword-only) level: Logging level Returns: ComponentLogger instance that logs to CLI and optionally streams Examples: # Recommended: Use via BaseCapability class MyCapability(BaseCapability): async def execute(self): logger = self.get_logger() # Auto-streams! logger.status("Working...") # Module-level (no streaming) logger = get_logger("orchestrator") logger.info("Planning started") # With streaming (when you have state) logger = get_logger("orchestrator", state=state) logger.status("Creating execution plan...") # Logs + streams logger.info("Active capabilities: [...]") # Logs only logger.error("Failed!") # Logs + streams # Custom logger logger = get_logger(name="test_logger", color="blue") .. deprecated:: The two-parameter API get_logger(source, component_name) is deprecated. Use get_logger(component_name) instead. The flat configuration structure (logging.logging_colors.{component_name}) replaces the old nested structure. """ import warnings # Initialize logging infrastructure with Rich formatting support _setup_rich_logging(level) # Handle explicit API for custom logger creation (tests, utilities) if name is not None: # Direct logger creation bypasses convention-based color assignment base_logger = logging.getLogger(name) actual_color = color or "white" return ComponentLogger(base_logger, name, actual_color, state=state) # Handle deprecated two-parameter API: get_logger("framework", "component") # This maintains backward compatibility while warning users to migrate if source is not None: warnings.warn( f"The two-parameter API get_logger('{source}', '{component_name}') is deprecated. " f"Use get_logger('{component_name}') instead. The 'source' parameter is no longer needed " f"as the configuration uses a flat structure: logging.logging_colors.{component_name}", DeprecationWarning, stacklevel=2, ) # For backward compatibility, still accept the old format but ignore source # component_name is already set from the second positional argument # Validate that component_name is provided if component_name is None: raise ValueError( "Component name is required. Usage: get_logger('component_name') or " "get_logger(name='custom_name', color='blue')" ) # Use component name as logger identifier for hierarchical organization base_logger = logging.getLogger(component_name) # Retrieve component-specific color from flat configuration structure try: # New flat structure: logging.logging_colors.{component_name} config_path = f"logging.logging_colors.{component_name}" color = get_config_value(config_path) if not color: color = "white" except Exception as e: # Graceful degradation ensures logging continues even with config issues color = "white" # Only show warning in debug mode to reduce noise import os if os.getenv("DEBUG_LOGGING"): print( f"⚠️ WARNING: Failed to load color config for {component_name}: {e}. Using white as fallback." ) # Pass state to enable streaming return ComponentLogger(base_logger, component_name, color, state=state)