Python Execution#
Python code generation and execution service with LangGraph-based workflow, approval integration, and flexible deployment options.
Note
For implementation tutorials and usage examples, see Python Execution.
Service Interface#
Request and Response Models#
- class osprey.services.python_executor.models.PythonExecutionRequest(*, user_query, task_objective, expected_results=<factory>, capability_prompts=<factory>, execution_folder_name, retries=3, planning_mode=PlanningMode.GENERATOR_DRIVEN, structured_plan=None, capability_context_data=None, approved_code=None, existing_execution_folder=None, session_context=None, execution_folder_path=None)[source]
Bases:
BaseModelType-safe, serializable request model for Python code execution services.
This Pydantic model defines the complete interface for requesting Python code generation and execution through the Python executor service. It encapsulates all necessary information for the service to understand the user’s intent, generate appropriate code, and execute it within the proper security context.
The request model is designed to be fully serializable and compatible with LangGraph’s state management system. It separates serializable request data from configuration objects, which are accessed through LangGraph’s configurable system for proper dependency injection and configuration management.
The model supports both fresh execution requests and continuation of existing execution sessions, with optional pre-approved code for bypassing the generation and analysis phases when code has already been validated.
- param user_query:
The original user query or task description that initiated this request (provides overall context)
- type user_query:
str
- param task_objective:
Step-specific goal from the orchestrator’s execution plan - self-sufficient description of what THIS specific step must accomplish
- type task_objective:
str
- param expected_results:
Dictionary describing expected outputs, success criteria, or result structure
- type expected_results:
Dict[str, Any]
- param capability_prompts:
Additional prompts or guidance for code generation context
- type capability_prompts:
List[str]
- param execution_folder_name:
Base name for the execution folder to be created
- type execution_folder_name:
str
- param retries:
Maximum number of retry attempts for code generation and execution
- type retries:
int
- param capability_context_data:
Context data from other capabilities for cross-capability integration
- type capability_context_data:
Dict[str, Any], optional
- param approved_code:
Pre-validated code to execute directly, bypassing generation
- type approved_code:
str, optional
- param existing_execution_folder:
Path to existing execution folder for session continuation
- type existing_execution_folder:
str, optional
- param session_context:
Session metadata including user and chat identifiers
- type session_context:
Dict[str, Any], optional
- param planning_mode:
Code generation planning mode (generator_driven or capability_driven)
- type planning_mode:
PlanningMode, optional
- param structured_plan:
Pre-built plan from capability (for capability_driven mode)
- type structured_plan:
StructuredPlan, optional
Note
The request model uses Pydantic for validation and serialization. All Path objects are represented as strings to ensure JSON compatibility.
Warning
When providing approved_code, ensure it has been properly validated through appropriate security and policy checks before submission.
See also
PythonExecutorService: Service that processes these requestsPythonExecutionState: LangGraph state containing request dataPythonServiceResult: Structured response from successful execution- Examples:
Basic execution request for data analysis:
>>> # Example: Multi-step analysis where this is step 2 (the actual analysis) >>> request = PythonExecutionRequest( ... user_query="Analyze the sensor data trends and create report", # Original user request ... task_objective="Calculate statistical trends from retrieved sensor data", # This step's goal ... expected_results={"statistics": "dict", "plot": "matplotlib figure"}, ... execution_folder_name="sensor_analysis" ... )
Request with pre-approved code:
>>> request = PythonExecutionRequest( ... user_query="Execute validated analysis code", ... task_objective="Run pre-approved statistical analysis", ... execution_folder_name="approved_analysis", ... approved_code="import pandas as pd
- df.describe()”
… )
Request with capability context integration:
>>> request = PythonExecutionRequest( ... user_query="Process archiver data", ... task_objective="Analyze retrieved EPICS data", ... execution_folder_name="epics_analysis", ... capability_context_data={ ... "archiver_data": {"pv_data": [...], "timestamps": [...]} ... } ... )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- task_objective: str
- expected_results: dict[str, Any]
- capability_prompts: list[str]
- execution_folder_name: str
- retries: int
- planning_mode: PlanningMode
- structured_plan: StructuredPlan | None
- capability_context_data: dict[str, Any] | None
- approved_code: str | None
- existing_execution_folder: str | None
- session_context: dict[str, Any] | None
- execution_folder_path: str | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class osprey.services.python_executor.models.PythonServiceResult(execution_result, generated_code, generation_attempt=1, analysis_warnings=<factory>)[source]
Bases:
objectStructured, type-safe result from Python executor service.
This eliminates the need for validation and error checking in capabilities. The service guarantees this structure is always returned on success. On failure, the service raises appropriate exceptions.
Following LangGraph patterns with frozen dataclasses for immutable results.
- execution_result: PythonExecutionSuccess
- generated_code: str
- generation_attempt: int
- analysis_warnings: list[str]
- __init__(execution_result, generated_code, generation_attempt=1, analysis_warnings=<factory>)
- class osprey.services.python_executor.models.PythonExecutionSuccess(results, stdout, execution_time, folder_path, notebook_path, notebook_link, figure_paths=<factory>)[source]
Bases:
objectComprehensive result data from successful Python code execution.
This dataclass encapsulates all outputs and artifacts produced by successful Python code execution, including computational results, execution metadata, performance metrics, and file system artifacts. It serves as the primary payload within PythonServiceResult and provides capabilities with structured access to execution outcomes.
The class captures both the logical results (computed data) and physical artifacts (notebooks, figures) produced during execution, along with execution metadata for monitoring and debugging purposes.
- Parameters:
results (Dict[str, Any]) – Dictionary containing the main computational results from code execution
stdout (str) – Complete stdout output captured during code execution
execution_time (float) – Total execution time in seconds
folder_path (Path) – File system path to the execution folder containing all artifacts
notebook_path (Path) – Path to the final notebook containing executed code and results
notebook_link (str) – Jupyter-accessible URL for viewing the execution notebook
figure_paths (List[Path]) – List of paths to any figures or plots generated during execution
Note
The results dictionary contains the primary computational outputs that other capabilities can use for further processing or analysis.
See also
PythonServiceResult: Container class that includes this execution dataPythonExecutionEngineResult: Internal engine result structure- results: dict[str, Any]
- stdout: str
- execution_time: float
- folder_path: Path
- notebook_path: Path
- notebook_link: str
- figure_paths: list[Path]
- to_dict()[source]
Convert execution success data to dictionary for serialization and compatibility.
Transforms the execution result into a dictionary format suitable for JSON serialization, logging, or integration with systems that expect dictionary-based data structures. All Path objects are converted to strings for compatibility.
- Returns:
Dictionary representation with standardized field names
- Return type:
Dict[str, Any]
Note
This method provides backward compatibility with existing code that expects dictionary-based execution results.
Examples
Converting execution results for logging:
>>> success = PythonExecutionSuccess( ... results={"mean": 42.0, "count": 100}, ... stdout="Calculation completed successfully", ... execution_time=2.5, ... folder_path=Path("/tmp/execution"), ... notebook_path=Path("/tmp/execution/notebook.ipynb"), ... notebook_link="http://jupyter/notebooks/notebook.ipynb" ... ) >>> data = success.to_dict() >>> print(f"Execution took {data['execution_time_seconds']} seconds") Execution took 2.5 seconds
- __init__(results, stdout, execution_time, folder_path, notebook_path, notebook_link, figure_paths=<factory>)
State Management#
- class osprey.services.python_executor.models.PythonExecutionState[source]
Bases:
TypedDictLangGraph state for Python executor service.
This state is used internally by the service and includes both the original request and execution tracking fields.
CRITICAL: The ‘request’ field preserves the existing interface, allowing service nodes to access all original request data via state.request.field_name
The ‘request’ field uses the preserve_once_set reducer to ensure it’s never lost during state updates or checkpoint resumption (e.g., approval workflows).
NOTE: capability_context_data is extracted to top level for ContextManager compatibility
- request: PythonExecutionRequest, <function preserve_once_set at 0x7fc496a57c40>]
- capability_context_data: dict[str, dict[str, dict[str, Any]]] | None
- generation_attempt: int
- error_chain: list[ExecutionError]
- current_stage: str
- requires_approval: bool | None
- approval_interrupt_data: dict[str, Any] | None
- approval_result: dict[str, Any] | None
- approved: bool | None
- generated_code: str | None
- analysis_result: Any | None
- analysis_failed: bool | None
- execution_failed: bool | None
- execution_result: Any | None
- execution_folder: Any | None
- code_generator_metadata: dict[str, Any] | None
- is_successful: bool
- is_failed: bool
- failure_reason: str | None
- class osprey.services.python_executor.models.PythonExecutionContext(folder_path=None, folder_url=None, attempts_folder=None, context_file_path=None, notebook_attempts=<factory>)[source]
Bases:
objectExecution context container managing file system resources and notebook tracking.
This class provides a centralized container for managing all file system resources, paths, and metadata associated with a Python execution session. It tracks the execution folder structure, notebook creation attempts, and provides convenient access to execution artifacts.
The context maintains a flat, simple structure that can be easily serialized and passed between different components of the execution pipeline. It serves as the primary coordination point for file operations and artifact management.
- Parameters:
folder_path (Path, optional) – Main execution folder path where all artifacts are stored
folder_url (str, optional) – Jupyter-accessible URL for the execution folder
attempts_folder (Path, optional) – Subfolder containing individual execution attempts
context_file_path (Path, optional) – Path to the serialized context file for the execution
notebook_attempts (List[NotebookAttempt]) – List of all notebook creation attempts for this execution
Note
The context is typically created by the FileManager during execution folder setup and is passed through the execution pipeline to coordinate file operations.
See also
FileManager: Creates and manages execution contextsNotebookAttempt: Individual notebook tracking records- folder_path: Path | None = None
- folder_url: str | None = None
- attempts_folder: Path | None = None
- context_file_path: Path | None = None
- notebook_attempts: list[NotebookAttempt]
- property is_initialized: bool
Check if execution folder has been properly initialized.
Determines whether the execution context has been set up with a valid folder path, indicating that the file system resources are ready for use by the execution pipeline.
- Returns:
True if folder_path is set, False otherwise
- Return type:
bool
Examples
Checking context initialization before use:
>>> context = PythonExecutionContext() >>> print(f"Ready: {context.is_initialized}") Ready: False >>> context.folder_path = Path("/tmp/execution") >>> print(f"Ready: {context.is_initialized}") Ready: True
- add_notebook_attempt(attempt)[source]
Add a notebook creation attempt to the tracking list.
Records a new notebook attempt in the execution context, maintaining a complete audit trail of all notebooks created during the execution session. This supports debugging and provides visibility into the execution workflow.
- Parameters:
attempt (NotebookAttempt) – Notebook attempt metadata to add to tracking
Examples
Adding a notebook attempt to context:
>>> context = PythonExecutionContext() >>> attempt = NotebookAttempt( ... notebook_type=NotebookType.FINAL_SUCCESS, ... attempt_number=1, ... stage="execution", ... notebook_path=Path("/path/to/notebook.ipynb"), ... notebook_link="http://jupyter/notebooks/notebook.ipynb" ... ) >>> context.add_notebook_attempt(attempt) >>> print(f"Total attempts: {len(context.notebook_attempts)}") Total attempts: 1
- get_next_attempt_number()[source]
Get the next sequential attempt number for notebook naming.
Calculates the next attempt number based on the current number of tracked notebook attempts. This ensures consistent, sequential numbering of notebooks throughout the execution session.
- Returns:
Next attempt number (1-based indexing)
- Return type:
int
Examples
Getting attempt number for new notebook:
>>> context = PythonExecutionContext() >>> print(f"First attempt: {context.get_next_attempt_number()}") First attempt: 1 >>> # After adding one attempt... >>> print(f"Next attempt: {context.get_next_attempt_number()}") Next attempt: 2
- __init__(folder_path=None, folder_url=None, attempts_folder=None, context_file_path=None, notebook_attempts=<factory>)
Configuration Models#
- class osprey.services.python_executor.config.PythonExecutorConfig(configurable=None)[source]
Bases:
objectConfiguration for Python Executor Service.
Manages essential configuration settings for the Python executor service, including retry limits and execution timeouts. Values can be overridden via framework configuration.
- __init__(configurable=None)[source]
- property limits_validator
Get limits validator (lazy-loaded from config).
Returns the LimitsValidator instance if runtime channel limits checking is enabled in the configuration, or None if disabled. The validator is loaded only once and cached for subsequent accesses.
- Returns:
LimitsValidator instance or None if disabled
- Return type:
LimitsValidator | None
- class osprey.services.python_executor.models.ExecutionModeConfig(mode_name, kernel_name, allows_writes, requires_approval, description, environment, epics_gateway=None)[source]
Bases:
objectSimple execution mode configuration.
- mode_name: str
- kernel_name: str
- allows_writes: bool
- requires_approval: bool
- description: str
- environment: dict[str, str]
- epics_gateway: dict[str, Any] | None = None
- __init__(mode_name, kernel_name, allows_writes, requires_approval, description, environment, epics_gateway=None)
- class osprey.services.python_executor.models.ContainerEndpointConfig(host, port, kernel_name, use_https=False)[source]
Bases:
objectContainer endpoint configuration.
- host: str
- port: int
- kernel_name: str
- use_https: bool = False
- property base_url: str
- __init__(host, port, kernel_name, use_https=False)
- class osprey.services.python_executor.execution.control.ExecutionControlConfig(epics_writes_enabled=False, control_system_writes_enabled=None, control_system_type='epics')[source]
Bases:
objectConfiguration class for control system execution control and security policy management.
This configuration class encapsulates the security policies and settings that determine how Python code execution is controlled within the system. It provides the logic for automatically selecting appropriate execution environments based on code analysis results and configured security policies.
The configuration implements a conservative security approach where write operations are only permitted when explicitly enabled and detected in the code. This ensures that potentially dangerous operations require both configuration permission and explicit code intent.
- Parameters:
epics_writes_enabled (bool) – (Deprecated) Whether EPICS write operations are permitted. Use control_system_writes_enabled instead.
control_system_writes_enabled (bool) – Whether control system write operations are permitted in this deployment
control_system_type (str) – Type of control system (epics, mock, tango, etc.)
Note
This configuration should be set based on the deployment environment and security requirements. Production control systems should carefully consider the implications of enabling write access.
Warning
Enabling control system writes allows executed code to potentially affect physical systems. Ensure appropriate approval workflows and monitoring are in place.
See also
ExecutionMode: Available execution environment modesget_execution_control_config(): Factory function for creating configurationsExamples
Creating a read-only configuration for safe analysis:
>>> config = ExecutionControlConfig(control_system_writes_enabled=False) >>> mode = config.get_execution_mode(has_epics_writes=True, has_epics_reads=True) >>> print(f"Mode: {mode}") # Always READ_ONLY when writes disabled Mode: ExecutionMode.READ_ONLY
Enabling controlled write access:
>>> write_config = ExecutionControlConfig(control_system_writes_enabled=True) >>> # Only grants write access when code actually contains write operations >>> read_mode = write_config.get_execution_mode(has_epics_writes=False, has_epics_reads=True) >>> write_mode = write_config.get_execution_mode(has_epics_writes=True, has_epics_reads=True) >>> print(f"Read mode: {read_mode}, Write mode: {write_mode}")
- epics_writes_enabled: bool = False
- control_system_writes_enabled: bool | None = None
- control_system_type: str = 'epics'
- __post_init__()[source]
Handle backward compatibility for epics_writes_enabled.
- get_execution_mode(has_epics_writes, has_epics_reads)[source]
Determine appropriate execution mode based on code analysis and security policy.
Analyzes the detected operations in the code (from static analysis) and applies the configured security policy to determine the most appropriate execution environment. The method implements a conservative approach where write access is only granted when both the code requires it and the configuration permits it.
The decision logic prioritizes security by defaulting to read-only access unless write operations are both detected in the code and explicitly enabled in the configuration.
- Parameters:
has_epics_writes (bool) – Whether static analysis detected EPICS write operations in the code
has_epics_reads (bool) – Whether static analysis detected EPICS read operations in the code
- Returns:
Execution mode appropriate for the detected operations and security policy
- Return type:
ExecutionMode
Note
The has_epics_reads parameter is provided for future extensibility but currently does not affect mode selection since read operations are permitted in all execution modes.
Examples
Mode selection with different code patterns:
>>> config = ExecutionControlConfig(epics_writes_enabled=True) >>> >>> # Code with only read operations >>> mode = config.get_execution_mode(has_epics_writes=False, has_epics_reads=True) >>> print(f"Read-only code: {mode}") Read-only code: ExecutionMode.READ_ONLY >>> >>> # Code with write operations (and writes enabled) >>> mode = config.get_execution_mode(has_epics_writes=True, has_epics_reads=True) >>> print(f"Write code: {mode}") Write code: ExecutionMode.WRITE_ACCESS
Security policy enforcement:
>>> secure_config = ExecutionControlConfig(epics_writes_enabled=False) >>> # Write operations detected but not permitted by policy >>> mode = secure_config.get_execution_mode(has_epics_writes=True, has_epics_reads=True) >>> print(f"Secured mode: {mode}") # Always READ_ONLY when writes disabled Secured mode: ExecutionMode.READ_ONLY
- validate()[source]
Validate configuration for logical consistency.
- Returns:
List of validation warnings/errors
- Return type:
list[str]
- __init__(epics_writes_enabled=False, control_system_writes_enabled=None, control_system_type='epics')
Notebook Management#
- class osprey.services.python_executor.models.NotebookAttempt(notebook_type, attempt_number, stage, notebook_path, notebook_link, error_context=None, created_at=None)[source]
Bases:
objectTracks metadata for a single notebook creation attempt during execution workflow.
This dataclass captures comprehensive information about each notebook created during the Python execution process, including its type, creation context, file system location, and any associated error information. It provides audit trails and debugging support for the execution workflow.
The class supports serialization for persistence and provides structured access to notebook metadata for both internal tracking and external reporting purposes.
- Parameters:
notebook_type (NotebookType) – Type of notebook created (generation, execution, final, etc.)
attempt_number (int) – Sequential attempt number for this execution session
stage (str) – Execution stage when notebook was created
notebook_path (Path) – File system path to the created notebook
notebook_link (str) – URL link for accessing the notebook in Jupyter interface
error_context (str, optional) – Optional error information if notebook creation failed
created_at (str, optional) – Timestamp when notebook was created
Note
The notebook_link provides direct access to view the notebook in the Jupyter interface, making it easy to inspect execution results.
See also
NotebookType: Enumeration of supported notebook typesPythonExecutionContext: Container for execution context and attempts- notebook_type: NotebookType
- attempt_number: int
- stage: str
- notebook_path: Path
- notebook_link: str
- error_context: str | None = None
- created_at: str | None = None
- to_dict()[source]
Convert notebook attempt to dictionary for serialization and storage.
Transforms the notebook attempt data into a serializable dictionary format suitable for JSON storage, logging, or transmission. All Path objects are converted to strings and enum values are converted to their string representations.
- Returns:
Dictionary representation with all fields as serializable types
- Return type:
Dict[str, Any]
Examples
Converting attempt to dictionary for logging:
>>> attempt = NotebookAttempt( ... notebook_type=NotebookType.FINAL_SUCCESS, ... attempt_number=1, ... stage="execution", ... notebook_path=Path("/path/to/notebook.ipynb"), ... notebook_link="http://jupyter/notebooks/notebook.ipynb" ... ) >>> data = attempt.to_dict() >>> print("Notebook type:", data['notebook_type']) Notebook type: final_success
- __init__(notebook_type, attempt_number, stage, notebook_path, notebook_link, error_context=None, created_at=None)
- class osprey.services.python_executor.models.NotebookType(value)[source]
Bases:
EnumEnumeration of notebook types created during Python execution workflow.
This enum categorizes the different types of Jupyter notebooks that are created throughout the Python execution lifecycle. Each notebook type serves a specific purpose in the execution workflow and provides different levels of detail for debugging and audit purposes.
The notebooks are created at key stages to provide comprehensive visibility into the execution process, from initial code generation through final execution results or failure analysis.
- Variables:
CODE_GENERATION_ATTEMPT – Notebook created after code generation but before analysis
PRE_EXECUTION – Notebook created after analysis approval but before execution
EXECUTION_ATTEMPT – Notebook created during or immediately after code execution
FINAL_SUCCESS – Final notebook created after successful execution completion
FINAL_FAILURE – Final notebook created after execution failure for debugging
Note
Each notebook type includes different metadata and context information appropriate for its stage in the execution workflow.
See also
NotebookAttempt: Tracks individual notebook creation attemptsNotebookManager: Manages notebook creation and lifecycle- CODE_GENERATION_ATTEMPT = 'code_generation_attempt'
- PRE_EXECUTION = 'pre_execution'
- EXECUTION_ATTEMPT = 'execution_attempt'
- FINAL_SUCCESS = 'final_success'
- FINAL_FAILURE = 'final_failure'
Exceptions#
- class osprey.services.python_executor.exceptions.PythonExecutorException(message, category, technical_details=None, folder_path=None)[source]
Bases:
ExceptionBase exception class for all Python executor service operations.
This abstract base class provides common functionality for all Python executor exceptions, including error categorization, context management, and retry logic determination. It serves as the foundation for the entire exception hierarchy and enables consistent error handling across the service.
The class implements a category-based approach to error handling that allows the service to automatically determine appropriate recovery strategies without requiring explicit exception type checking in the retry logic.
- Parameters:
message (str) – Human-readable error description
category (ErrorCategory) – Error category that determines recovery strategy
technical_details (Dict[str, Any], optional) – Additional technical information for debugging
folder_path (Path, optional) – Path to execution folder if available for debugging
Note
This base class should not be raised directly. Use specific exception subclasses that provide more detailed error information.
See also
ErrorCategory: Error categorization for recovery strategiesContainerConnectivityError: Infrastructure error exampleCodeRuntimeError: Code-related error example- __init__(message, category, technical_details=None, folder_path=None)[source]
- is_infrastructure_error()[source]
Check if this is an infrastructure or connectivity error.
Infrastructure errors indicate problems with external dependencies like container connectivity, network issues, or service availability. These errors typically warrant retrying the same operation after a delay.
- Returns:
True if this is an infrastructure error
- Return type:
bool
Examples
Checking error type for retry logic:
>>> try: ... await execute_code(code) ... except PythonExecutorException as e: ... if e.is_infrastructure_error(): ... await asyncio.sleep(1) # Brief delay ... await execute_code(code) # Retry same code
- is_code_error()[source]
Check if this is a code-related error requiring code regeneration.
Code errors indicate problems with the generated or provided Python code, including syntax errors, runtime failures, or logical issues. These errors typically require regenerating the code with error feedback.
- Returns:
True if this is a code-related error
- Return type:
bool
Examples
Handling code errors with regeneration:
>>> try: ... await execute_code(code) ... except PythonExecutorException as e: ... if e.is_code_error(): ... new_code = await regenerate_code(error_feedback=str(e)) ... await execute_code(new_code)
- is_workflow_error()[source]
Check if this is a workflow control error requiring special handling.
Workflow errors indicate issues with the service’s execution workflow, such as timeouts, maximum retry limits, or approval requirements. These errors typically require user intervention or service configuration changes.
- Returns:
True if this is a workflow control error
- Return type:
bool
Examples
Handling workflow errors with user notification:
>>> try: ... await execute_code(code) ... except PythonExecutorException as e: ... if e.is_workflow_error(): ... await notify_user(f"Execution failed: {e.message}")
- should_retry_execution()[source]
Determine if the same code execution should be retried.
Returns True for infrastructure errors where the code itself is likely correct but external dependencies (containers, network) caused the failure. This enables automatic retry of the same code without regeneration.
- Returns:
True if execution should be retried with the same code
- Return type:
bool
Examples
Automatic retry logic based on error category:
>>> if exception.should_retry_execution(): ... logger.info("Infrastructure issue, retrying execution...") ... await retry_execution_with_backoff(code)
- should_retry_code_generation()[source]
Determine if code should be regenerated and execution retried.
Returns True for code-related errors where the generated code has issues that require regeneration with error feedback. This enables automatic code improvement through iterative generation.
- Returns:
True if code should be regenerated and execution retried
- Return type:
bool
Examples
Code regeneration retry logic:
>>> if exception.should_retry_code_generation(): ... logger.info("Code issue, regenerating with feedback...") ... improved_code = await regenerate_with_feedback(str(exception)) ... await execute_code(improved_code)
- class osprey.services.python_executor.exceptions.CodeRuntimeError(message, traceback_info, execution_attempt, technical_details=None, folder_path=None)[source]
Bases:
PythonExecutorExceptionCode failed during execution due to runtime errors
- __init__(message, traceback_info, execution_attempt, technical_details=None, folder_path=None)[source]
- class osprey.services.python_executor.exceptions.CodeGenerationError(message, generation_attempt, error_chain, technical_details=None)[source]
Bases:
PythonExecutorExceptionLLM failed to generate valid code
- __init__(message, generation_attempt, error_chain, technical_details=None)[source]
- class osprey.services.python_executor.exceptions.ContainerConnectivityError(message, host, port, technical_details=None)[source]
Bases:
PythonExecutorExceptionException raised when Jupyter container is unreachable or connection fails.
This infrastructure error indicates that the Python executor service cannot establish communication with the configured Jupyter container endpoint. This typically occurs due to network issues, container startup problems, or configuration mismatches.
The error provides both technical details for debugging and user-friendly messages that abstract the underlying infrastructure complexity while preserving essential information for troubleshooting.
- Parameters:
message (str) – Technical error description for debugging
host (str) – Container host address that failed to connect
port (int) – Container port that failed to connect
technical_details (Dict[str, Any], optional) – Additional technical information for debugging
Note
This error triggers automatic retry logic since the code itself is likely correct and the issue is with external infrastructure.
See also
ContainerConfigurationError: Configuration-related container issuesPythonExecutorException.should_retry_execution: Retry logic for infrastructure errorsExamples
Handling container connectivity issues:
>>> try: ... result = await container_executor.execute_code(code) ... except ContainerConnectivityError as e: ... logger.warning(f"Container issue: {e.get_user_message()}") ... # Automatic retry or fallback to local execution ... result = await local_executor.execute_code(code)
- __init__(message, host, port, technical_details=None)[source]
- get_user_message()[source]
Get user-friendly error message abstracting technical details.
Provides a clear, non-technical explanation of the connectivity issue that users can understand without needing to know about container infrastructure details.
- Returns:
User-friendly error description
- Return type:
str
Examples
Displaying user-friendly error messages:
>>> error = ContainerConnectivityError( ... "Connection refused", "localhost", 8888 ... ) >>> print(error.get_user_message()) Python execution environment is not reachable at localhost:8888
- class osprey.services.python_executor.exceptions.ExecutionTimeoutError(timeout_seconds, technical_details=None, folder_path=None)[source]
Bases:
PythonExecutorExceptionCode execution exceeded timeout
- __init__(timeout_seconds, technical_details=None, folder_path=None)[source]
- class osprey.services.python_executor.exceptions.ChannelLimitsViolationError(channel_address, value, violation_type, violation_reason, min_value=None, max_value=None, max_step=None, current_value=None)[source]
Bases:
PythonExecutorExceptionRaised when a channel write violates configured limits.
This code-related error indicates that generated or user code attempted to write a value to a channel that violates safety limits defined in the limits database. This includes min/max limit violations, read-only channel writes, excessive step sizes, or writes to unlisted channels.
The error provides comprehensive details about the violation including the channel address, attempted value, current value (for step violations), and the configured limits to help users understand why the write was blocked.
- Parameters:
channel_address (str) – Channel address that was accessed
value (Any) – The value that was attempted to be written
violation_type (str) – Type of violation (MIN_EXCEEDED, MAX_EXCEEDED, READ_ONLY_CHANNEL, UNLISTED_CHANNEL, MAX_STEP_EXCEEDED, STEP_CHECK_FAILED)
violation_reason (str) – Human-readable explanation of the violation
min_value (float, optional) – Configured minimum value for the channel
max_value (float, optional) – Configured maximum value for the channel
max_step (float, optional) – Configured maximum step size for the channel
current_value (Any, optional) – Current channel value (for step violations)
Note
This error is raised during code execution when runtime channel limits checking is enabled and detects a safety violation.
See also
LimitsValidator: Validation engine that raises this exceptionChannelLimitsConfig: Configuration for channel limitsExamples
Handling channel limits violations:
>>> try: ... await executor.execute_code(code) ... except ChannelLimitsViolationError as e: ... logger.error(f"Safety violation: {e.violation_reason}") ... logger.error(f"Attempted to write {e.attempted_value} to {e.channel_address}") ... # Code should be regenerated with safer values
- __init__(channel_address, value, violation_type, violation_reason, min_value=None, max_value=None, max_step=None, current_value=None)[source]
- class osprey.services.python_executor.exceptions.ErrorCategory(value)[source]
Bases:
EnumHigh-level error categories that determine appropriate recovery strategies.
This enumeration classifies all Python executor errors into categories that directly correspond to different recovery and retry strategies. The categorization enables intelligent error handling that can automatically determine whether to retry execution, regenerate code, or require user intervention.
- Variables:
INFRASTRUCTURE – Container connectivity, network, or external service issues
CODE_RELATED – Syntax errors, runtime failures, or logical issues in generated code
WORKFLOW – Service workflow control issues like timeouts or retry limits
CONFIGURATION – Invalid or missing configuration settings
Note
Error categories are used by the service’s retry logic to determine the appropriate recovery strategy without requiring explicit error type checking.
See also
PythonExecutorException: Base exception class using these categoriesPythonExecutorException.should_retry_execution(): Infrastructure retry logicPythonExecutorException.should_retry_code_generation(): Code regeneration logic- INFRASTRUCTURE = 'infrastructure'
- CODE_RELATED = 'code_related'
- WORKFLOW = 'workflow'
- CONFIGURATION = 'configuration'
Utility Functions#
- osprey.services.python_executor.models.get_execution_control_config_from_configurable(configurable)[source]#
Get execution control configuration from LangGraph configurable - raises exceptions on failure.
This provides a consistent way to access control system execution control settings from the configurable that is passed to the Python executor service, ensuring security-critical settings like control_system_writes_enabled are accessed consistently.
- Parameters:
configurable (dict[str, Any]) – The LangGraph configurable dictionary
- Returns:
Execution control configuration
- Return type:
ExecutionControlConfig
- Raises:
ContainerConfigurationError – If configuration is missing or invalid
Serialization Utilities#
- osprey.services.python_executor.services.make_json_serializable(obj)[source]#
Convert complex objects to JSON-serializable format using modern Python patterns.
This is a standalone function that can be imported and used by execution wrappers and other components that need robust JSON serialization.
- Parameters:
obj (Any) – Any Python object to make JSON-serializable
- Returns:
JSON-serializable representation of the object
- Return type:
Any
Examples
>>> import numpy as np >>> import matplotlib.pyplot as plt >>> >>> # Handle numpy arrays >>> arr = np.array([1, 2, 3]) >>> serializable = make_json_serializable(arr) >>> print(serializable) # [1, 2, 3] >>> >>> # Handle matplotlib figures >>> fig, ax = plt.subplots() >>> ax.plot([1, 2, 3]) >>> serializable = make_json_serializable(fig) >>> print(serializable['_type']) # 'matplotlib_figure'
- osprey.services.python_executor.services.serialize_results_to_file(results, file_path)[source]#
Serialize results and save to JSON file with comprehensive error handling.
This function is designed to be called from execution wrappers and provides robust serialization with detailed error reporting.
- Parameters:
results (Any) – The results object to serialize
file_path (str) – Path where to save the JSON file
- Returns:
Metadata about the serialization operation
- Return type:
dict
Examples
>>> # Called from execution wrapper >>> metadata = serialize_results_to_file(results, 'results.json') >>> if metadata['success']: >>> print(f"Results saved successfully to {metadata['file_path']}") >>> else: >>> print(f"Serialization failed: {metadata['error']}")
See also
- Python Execution
Complete implementation guide and examples
osprey.capabilities.python.PythonCapabilityCapability interface that uses this service