Orchestration#
Infrastructure node that creates execution plans from active capabilities and task requirements with native LangGraph interrupt support for human-in-the-loop approval.
OrchestrationNode#
Core Models#
Orchestration uses models defined in the core framework:
See also
ExecutionPlanStructured execution plan model
PlannedStepIndividual step within execution plans
BaseInfrastructureNodeBase class for infrastructure components
Approval System Integration#
- osprey.approval.approval_system.create_plan_approval_interrupt(execution_plan, plan_file_path=None, pending_plans_dir=None)[source]#
Create structured interrupt data for execution plan approval with file-based storage support.
Generates LangGraph-compatible interrupt data that presents execution plans to users for approval. The interrupt includes formatted step details, clear approval instructions, and structured payload data for seamless resume operations after user approval.
The function supports file-based execution plan storage for enhanced human-in-the-loop workflows, particularly for Open WebUI integration.
The generated user message provides a comprehensive view of planned operations with step-by-step breakdown, making it easy for users to understand and evaluate the proposed execution plan.
- Parameters:
execution_plan (ExecutionPlan) – Execution plan object containing steps and configuration
plan_file_path (str, optional) – Optional file path where the execution plan was saved
pending_plans_dir (str, optional) – Optional directory path for pending plan files
- Returns:
Dictionary containing user_message and resume_payload for LangGraph
- Return type:
Dict[str, Any]
Examples
Basic plan approval:
>>> from osprey.base.planning import ExecutionPlan >>> plan = ExecutionPlan(steps=[ ... {'task_objective': 'Load data', 'capability': 'data_loader'}, ... {'task_objective': 'Analyze trends', 'capability': 'data_analysis'} ... ]) >>> interrupt_data = create_plan_approval_interrupt(plan) >>> print(interrupt_data['user_message']) # Contains formatted approval request
Note
The interrupt data follows LangGraph’s standard structure with user_message for display and resume_payload for execution continuation.
See also
osprey.base.planning.ExecutionPlan: Input structure for this functioncreate_approval_type(): Approval type generation used by this functionget_approval_resume_data(): Function that processes the resume payloadclear_approval_state(): State cleanup after approval processing
- osprey.approval.approval_system.clear_approval_state()[source]#
Clear approval state to prevent contamination between operations.
Provides centralized cleanup of approval state fields to maintain clean state hygiene between operations. This prevents approval data from previous interrupts from interfering with subsequent operations, ensuring each approval request is handled independently.
This function is typically called after processing approval results or when initializing new operations that should not inherit approval state.
- Returns:
Dictionary containing approval state fields reset to None
- Return type:
Dict[str, Any]
Examples
Clean state after processing approval:
>>> # After handling approved operation >>> state_updates = clear_approval_state() >>> # Apply to current state >>> current_state.update(state_updates)
Initialize clean operation:
>>> # Before starting new capability that might need approval >>> clean_state = clear_approval_state() >>> new_state = {**current_state, **clean_state}
Note
This function only returns the state updates - callers must apply them to the actual state object.
- osprey.approval.approval_system.create_approval_type(capability_name, operation_type=None)[source]#
Generate dynamic approval type identifier from capability and operation.
Creates unique approval type identifiers that replace the hard-coded ApprovalType enum with a flexible system. This enables any capability to request approval without requiring framework modifications, while maintaining clear identification and supporting operation-level granularity within capabilities.
The generated identifiers follow a consistent naming pattern that ensures uniqueness and readability for logging, debugging, and user interfaces.
- Parameters:
capability_name (str) – Name of the capability requesting approval
operation_type (str, optional) – Optional specific operation type for granular control
- Returns:
Unique string identifier for the approval type
- Return type:
str
Examples
Basic capability approval:
>>> approval_type = create_approval_type("python") >>> print(approval_type) "python"
Operation-specific approval:
>>> approval_type = create_approval_type("memory", "save") >>> print(approval_type) "memory_save"
Complex capability with operation:
>>> approval_type = create_approval_type("data_analysis", "execute_query") >>> print(approval_type) "data_analysis_execute_query"
See also
create_code_approval_interrupt(): Uses this function for approval type creationcreate_memory_approval_interrupt(): Uses this function for approval type creationcreate_plan_approval_interrupt(): Uses this function for approval type creationget_approval_resume_data(): Uses approval types for state management
Registration#
Automatically registered as:
NodeRegistration(
name="orchestrator",
module_path="osprey.infrastructure.orchestration_node",
function_name="OrchestrationNode",
description="Execution planning and orchestration"
)
See also
- Prompt System
Prompt customization system
- Human Approval
Complete approval system architecture
- Orchestrator Planning
Implementation details and usage patterns