"""
Component Logger Framework
Provides colored logging for Osprey and application components with:
- Unified API for all components (capabilities, infrastructure, pipelines)
- Rich terminal output with component-specific colors
- Graceful fallbacks when configuration is unavailable
- Simple, clear interface
- Optional LangGraph streaming support via lazy initialization
Usage:
# Components with streaming (via BaseCapability.get_logger())
logger = self.get_logger()
logger.status("Creating execution plan...") # Logs + streams
logger.info("Active capabilities: [...]") # Logs only
# Module-level (no streaming)
logger = get_logger("orchestrator")
logger.key_info("Starting orchestration")
logger = get_logger("data_processor")
logger.info("Processing data")
logger.debug("Detailed trace")
logger.success("Operation completed")
logger.warning("Something to note")
logger.error("Something went wrong")
logger.timing("Execution took 2.5 seconds")
logger.approval("Waiting for user approval")
# Custom loggers with explicit parameters
logger = get_logger(name="custom_component", color="blue")
"""
import logging
from contextlib import contextmanager
from typing import Any
from rich.console import Console
from rich.logging import RichHandler
from osprey.utils.config import get_config_value
@contextmanager
def quiet_logging():
"""Context manager to temporarily suppress INFO-level logs.
Temporarily raises the root logger level to WARNING, suppressing
INFO and DEBUG messages while preserving warnings and errors.
Used during direct chat mode for a cleaner conversational experience.
Example:
>>> with quiet_logging():
... # INFO logs are suppressed here
... await process_direct_chat()
# Normal logging restored after context exits
"""
root_logger = logging.getLogger()
original_level = root_logger.level
root_logger.setLevel(logging.WARNING)
try:
yield
finally:
root_logger.setLevel(original_level)
[docs]
class ComponentLogger:
"""
Rich-formatted logger for Osprey and application components with color coding and message hierarchy.
Now includes optional LangGraph streaming support via lazy initialization.
Message Types:
- status: High-level status updates (logs + streams automatically)
- key_info: Important operational information
- info: Normal operational messages
- debug: Detailed tracing information
- warning: Warning messages
- error: Error messages (logs + streams automatically)
- success: Success messages (logs + streams by default)
- timing: Timing information
- approval: Approval messages
- resume: Resume messages
"""
[docs]
def __init__(
self,
base_logger: logging.Logger,
component_name: str,
color: str = "white",
state: Any = None,
):
"""
Initialize component logger.
Args:
base_logger: Underlying Python logger
component_name: Name of the component (e.g., 'data_analysis', 'router', 'mongo')
color: Rich color name for this component
state: Optional AgentState for streaming context
"""
self.base_logger = base_logger
self.component_name = component_name
self.color = color
self._state = state
# Lazy initialization - only when first needed
self._stream_writer = None
self._stream_writer_attempted = False
self._step_info = None
def _get_stream_writer(self):
"""Lazy initialization of stream writer (only when first needed)."""
if not self._stream_writer_attempted:
self._stream_writer_attempted = True
try:
from langgraph.config import get_stream_writer
self._stream_writer = get_stream_writer()
# Also extract step info when we get the writer
self._step_info = self._extract_step_info(self._state)
except (RuntimeError, ImportError):
# Not in LangGraph context - that's fine
self._stream_writer = None
self._step_info = {}
return self._stream_writer
def _extract_step_info(self, state):
"""Extract step context from AgentState for streaming metadata.
Reuses existing logic from StreamWriter._get_step_info():
- Task preparation phases use hard-coded step mapping
- Execution phases extract from execution plan in state
- Falls back to basic component info
"""
# Import the step mapping from streaming module
try:
from osprey.utils.streaming import TASK_PREPARATION_STEPS
except ImportError:
TASK_PREPARATION_STEPS = {}
# Check if this is a task preparation component
if self.component_name in TASK_PREPARATION_STEPS:
return TASK_PREPARATION_STEPS[self.component_name]
# For execution phase, extract from state
if state and hasattr(state, "get"):
try:
from osprey.state.state_manager import StateManager
execution_plan = state.get("planning_execution_plan")
if execution_plan and execution_plan.get("steps"):
current_step_index = StateManager.get_current_step_index(state)
total_steps = len(execution_plan.get("steps", []))
if total_steps > 0:
return {
"step": current_step_index + 1,
"total_steps": total_steps,
"phase": "Execution",
}
except Exception:
pass # Graceful degradation
# Default: no step info
return {
"step": None,
"total_steps": None,
"phase": self.component_name.replace("_", " ").title(),
}
def _emit_stream_event(self, message: str, event_type: str = "status", **kwargs):
"""Emit streaming event if writer available."""
writer = self._get_stream_writer() # Lazy init
if not writer:
return # Silently skip if no stream available
try:
import time
event = {
"event_type": event_type,
"message": message,
"component": self.component_name,
"timestamp": time.time(),
**(self._step_info or {}),
**kwargs,
}
# Clean up None values
event = {k: v for k, v in event.items() if v is not None}
writer(event)
except Exception as e:
# Don't crash logging just because streaming failed
self.debug(f"Failed to emit stream event: {e}")
def _format_message(self, message: str, style: str, emoji: str = "") -> str:
"""Format message with Rich markup and emoji prefix."""
try:
prefix = f"{emoji}{self.component_name.title()}: "
if style:
return f"[{style}]{prefix}{message}[/{style}]"
else:
return f"{prefix}{message}"
except Exception:
# Graceful degradation for environments where Rich markup fails
return f"{emoji}{self.component_name.title()}: {message}"
def _build_extra(self, message: str, log_type: str, **kwargs) -> dict:
"""Build extra dict with raw message, log type, and all streaming data.
This embeds all the data that streaming events carry into the Python log,
enabling TUI to get all data from a single source (Python logging).
Args:
message: The raw message (without Rich markup)
log_type: The log type (status, success, error, info, etc.)
**kwargs: Additional data fields (task, capabilities, steps, etc.)
Returns:
Dict to pass as extra= parameter to base_logger
"""
extra = {
"raw_message": message,
"log_type": log_type,
}
# Include all streaming data fields (for TUI to extract)
for key in [
"task",
"capabilities",
"capability_names",
"steps",
"phase",
"step_num",
"step_name",
"llm_prompt",
"llm_response",
]:
if key in kwargs:
extra[key] = kwargs[key]
# Also include step info from state if available
if self._step_info:
if "phase" not in extra and "phase" in self._step_info:
extra["phase"] = self._step_info.get("phase", "")
return extra
[docs]
def status(self, message: str, **kwargs) -> None:
"""Status update - logs and streams automatically.
Use for high-level progress updates that users should see in both
CLI and web interfaces.
Args:
message: Status message
**kwargs: Additional metadata for streaming event
Example:
logger.status("Creating execution plan...")
logger.status("Processing batch 2/5", batch=2, total=5)
"""
style = f"bold {self.color}" if self.color != "white" else "bold white"
formatted = self._format_message(message, style, "")
extra = self._build_extra(message, "status", **kwargs)
self.base_logger.info(formatted, extra=extra)
self._emit_stream_event(message, "status", **kwargs)
[docs]
def key_info(self, message: str, stream: bool = False, **kwargs) -> None:
"""Important operational information - logs and optionally streams.
Args:
message: Info message
stream: Whether to also stream this message
**kwargs: Additional metadata for streaming event
"""
style = f"bold {self.color}" if self.color != "white" else "bold white"
formatted = self._format_message(message, style, "")
extra = self._build_extra(message, "key_info", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "key_info", **kwargs)
[docs]
def info(self, message: str, stream: bool = False, **kwargs) -> None:
"""Info message - logs always, streams optionally.
By default, info messages only go to CLI logs. Use stream=True
to also send to web interface.
Args:
message: Info message
stream: Whether to also stream this message
**kwargs: Additional metadata for streaming event
Example:
logger.info("Active capabilities: [...]") # CLI only
logger.info("Step completed", stream=True) # CLI + stream
"""
formatted = self._format_message(message, self.color, "")
extra = self._build_extra(message, "info", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "info", **kwargs)
[docs]
def debug(self, message: str, stream: bool = False, **kwargs) -> None:
"""Debug message - logs only (never streams by default).
Debug messages are detailed technical info not meant for web UI.
Args:
message: Debug message
stream: Whether to stream (default: False)
**kwargs: Additional metadata for streaming event
"""
style = f"dim {self.color}" if self.color != "white" else "dim white"
formatted = self._format_message(message, style, "🔍 ")
extra = self._build_extra(message, "debug", **kwargs)
self.base_logger.debug(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "debug", **kwargs)
[docs]
def warning(self, message: str, stream: bool = True, **kwargs) -> None:
"""Warning message - logs and optionally streams.
Warnings stream by default since they're important for users to see.
Args:
message: Warning message
stream: Whether to stream (default: True)
**kwargs: Additional metadata for streaming event
"""
formatted = self._format_message(message, "bold yellow", "⚠️ ")
extra = self._build_extra(message, "warning", **kwargs)
self.base_logger.warning(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "warning", warning=True, **kwargs)
[docs]
def error(self, message: str, exc_info: bool = False, **kwargs) -> None:
"""Error message - always logs and streams.
Errors are important and should always be visible in both interfaces.
Args:
message: Error message
exc_info: Whether to include exception traceback
**kwargs: Additional error metadata for streaming event
"""
formatted = self._format_message(message, "bold red", "❌ ")
extra = self._build_extra(message, "error", **kwargs)
self.base_logger.error(formatted, exc_info=exc_info, extra=extra)
self._emit_stream_event(message, "error", error=True, **kwargs)
[docs]
def success(self, message: str, stream: bool = True, **kwargs) -> None:
"""Success message - logs and optionally streams.
Success messages stream by default to give users feedback.
Args:
message: Success message
stream: Whether to stream (default: True)
**kwargs: Additional metadata for streaming event
"""
formatted = self._format_message(message, "bold green", "✅ ")
extra = self._build_extra(message, "success", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "success", **kwargs)
[docs]
def timing(self, message: str, stream: bool = False, **kwargs) -> None:
"""Timing information - logs and optionally streams.
Args:
message: Timing message
stream: Whether to stream (default: False)
**kwargs: Additional metadata for streaming event
"""
formatted = self._format_message(message, "bold white", "🕒 ")
extra = self._build_extra(message, "timing", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "timing", **kwargs)
[docs]
def approval(self, message: str, stream: bool = True, **kwargs) -> None:
"""Approval messages - logs and optionally streams.
Approval requests stream by default so users see them in web UI.
Args:
message: Approval message
stream: Whether to stream (default: True)
**kwargs: Additional metadata for streaming event
"""
formatted = self._format_message(message, "bold yellow", "🔍⚠️ ")
extra = self._build_extra(message, "approval", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "approval", **kwargs)
[docs]
def resume(self, message: str, stream: bool = True, **kwargs) -> None:
"""Resume messages - logs and optionally streams.
Resume messages stream by default to provide feedback.
Args:
message: Resume message
stream: Whether to stream (default: True)
**kwargs: Additional metadata for streaming event
"""
formatted = self._format_message(message, "bold green", "🔄 ")
extra = self._build_extra(message, "resume", **kwargs)
self.base_logger.info(formatted, extra=extra)
if stream:
self._emit_stream_event(message, "resume", **kwargs)
# Compatibility methods - delegate to base logger
[docs]
def critical(self, message: str, *args, **kwargs) -> None:
formatted = self._format_message(message, "bold red", "❌ ")
self.base_logger.critical(formatted, *args, **kwargs)
[docs]
def exception(self, message: str, *args, **kwargs) -> None:
formatted = self._format_message(message, "bold red", "❌ ")
self.base_logger.exception(formatted, *args, **kwargs)
[docs]
def log(self, level: int, message: str, *args, **kwargs) -> None:
self.base_logger.log(level, message, *args, **kwargs)
# Properties for compatibility
@property
def level(self) -> int:
return self.base_logger.level
@property
def name(self) -> str:
return self.base_logger.name
[docs]
def setLevel(self, level: int) -> None:
self.base_logger.setLevel(level)
[docs]
def isEnabledFor(self, level: int) -> bool:
return self.base_logger.isEnabledFor(level)
def _setup_rich_logging(level: int = logging.INFO) -> None:
"""Configure Rich logging for the root logger (called once)."""
root_logger = logging.getLogger()
# Prevent duplicate handler registration for consistent logging behavior
for handler in root_logger.handlers:
if isinstance(handler, RichHandler):
return
# Ensure clean handler state to prevent duplicate log messages
if root_logger.hasHandlers():
root_logger.handlers.clear()
root_logger.setLevel(level)
# Load user-configurable display preferences from config
try:
# Security-conscious defaults: hide locals to prevent sensitive data exposure
rich_tracebacks = get_config_value("logging.rich_tracebacks", True)
show_traceback_locals = get_config_value("logging.show_traceback_locals", False)
show_full_paths = get_config_value("logging.show_full_paths", False)
except Exception:
# Secure defaults when configuration system is unavailable
rich_tracebacks = True
show_traceback_locals = False
show_full_paths = False
# Optimize console for containerized and CI/CD environments
console = Console(
force_terminal=True, # Ensure color output in Docker containers and CI systems
width=120, # Prevent line wrapping in standard terminal sizes
color_system="truecolor", # Enable full color spectrum for component identification
)
handler = RichHandler(
console=console, # Use our custom console
rich_tracebacks=rich_tracebacks, # Configurable rich tracebacks
markup=True, # Enable [bold], [green], etc. in log messages
show_path=show_full_paths, # Configurable path display
show_time=True, # Show timestamp
show_level=True, # Show log level
tracebacks_show_locals=show_traceback_locals, # Configurable local variables
)
root_logger.addHandler(handler)
# Reduce third-party library noise to focus on application-specific issues
for lib in ["httpx", "httpcore", "requests", "urllib3", "LiteLLM"]:
logging.getLogger(lib).setLevel(logging.WARNING)
[docs]
def get_logger(
component_name: str = None,
level: int = logging.INFO,
*,
state: Any = None,
name: str = None,
color: str = None,
# Deprecated parameters - kept for backward compatibility
source: str = None,
) -> ComponentLogger:
"""
Get a unified logger that handles both CLI logging and LangGraph streaming.
Primary API (recommended - use via BaseCapability.get_logger()):
component_name: Component name (e.g., 'orchestrator', 'data_analysis')
state: Optional AgentState for streaming context and step tracking
level: Logging level
Explicit API (for custom loggers or module-level usage):
name: Direct logger name (keyword-only)
color: Direct color specification (keyword-only)
level: Logging level
Returns:
ComponentLogger instance that logs to CLI and optionally streams
Examples:
# Recommended: Use via BaseCapability
class MyCapability(BaseCapability):
async def execute(self):
logger = self.get_logger() # Auto-streams!
logger.status("Working...")
# Module-level (no streaming)
logger = get_logger("orchestrator")
logger.info("Planning started")
# With streaming (when you have state)
logger = get_logger("orchestrator", state=state)
logger.status("Creating execution plan...") # Logs + streams
logger.info("Active capabilities: [...]") # Logs only
logger.error("Failed!") # Logs + streams
# Custom logger
logger = get_logger(name="test_logger", color="blue")
.. deprecated::
The two-parameter API get_logger(source, component_name) is deprecated.
Use get_logger(component_name) instead. The flat configuration structure
(logging.logging_colors.{component_name}) replaces the old nested structure.
"""
import warnings
# Initialize logging infrastructure with Rich formatting support
_setup_rich_logging(level)
# Handle explicit API for custom logger creation (tests, utilities)
if name is not None:
# Direct logger creation bypasses convention-based color assignment
base_logger = logging.getLogger(name)
actual_color = color or "white"
return ComponentLogger(base_logger, name, actual_color, state=state)
# Handle deprecated two-parameter API: get_logger("framework", "component")
# This maintains backward compatibility while warning users to migrate
if source is not None:
warnings.warn(
f"The two-parameter API get_logger('{source}', '{component_name}') is deprecated. "
f"Use get_logger('{component_name}') instead. The 'source' parameter is no longer needed "
f"as the configuration uses a flat structure: logging.logging_colors.{component_name}",
DeprecationWarning,
stacklevel=2,
)
# For backward compatibility, still accept the old format but ignore source
# component_name is already set from the second positional argument
# Validate that component_name is provided
if component_name is None:
raise ValueError(
"Component name is required. Usage: get_logger('component_name') or "
"get_logger(name='custom_name', color='blue')"
)
# Use component name as logger identifier for hierarchical organization
base_logger = logging.getLogger(component_name)
# Retrieve component-specific color from flat configuration structure
try:
# New flat structure: logging.logging_colors.{component_name}
config_path = f"logging.logging_colors.{component_name}"
color = get_config_value(config_path)
if not color:
color = "white"
except Exception as e:
# Graceful degradation ensures logging continues even with config issues
color = "white"
# Only show warning in debug mode to reduce noise
import os
if os.getenv("DEBUG_LOGGING"):
print(
f"⚠️ WARNING: Failed to load color config for {component_name}: {e}. Using white as fallback."
)
# Pass state to enable streaming
return ComponentLogger(base_logger, component_name, color, state=state)