Control System Connectors#
Pluggable connector abstraction for control systems and archivers with mock and production implementations. Enables development without hardware access and seamless migration to production by changing configuration.
Note
For implementation guides and examples, see Control System Integration.
Factory Classes#
- class osprey.connectors.factory.ConnectorFactory[source]#
Bases:
objectFactory for creating control system and archiver connectors.
Provides centralized management of available connectors. Connectors are registered automatically through the Osprey registry system during framework initialization.
Example
>>> # Using default config >>> cs_connector = await ConnectorFactory.create_control_system_connector() >>> >>> # Using custom config >>> config = {'type': 'mock', 'connector': {'mock': {...}}} >>> cs_connector = await ConnectorFactory.create_control_system_connector(config)
- classmethod register_control_system(name, connector_class)[source]#
Register a control system connector.
- Parameters:
name (str) – Unique name for the connector (e.g., ‘epics’, ‘tango’, ‘mock’)
connector_class (type[ControlSystemConnector]) – Connector class implementing ControlSystemConnector
- classmethod register_archiver(name, connector_class)[source]#
Register an archiver connector.
- Parameters:
name (str) – Unique name for the connector (e.g., ‘epics_archiver’, ‘mock_archiver’)
connector_class (type[ArchiverConnector]) – Connector class implementing ArchiverConnector
- async classmethod create_control_system_connector(config=None)[source]#
Create and configure a control system connector.
- Parameters:
config (dict[str, Any]) – Control system configuration dict with keys: - type: Connector type (e.g., ‘epics’, ‘mock’) - connector: Dict with connector-specific configs If None, loads from global config
- Returns:
Initialized and connected ControlSystemConnector
- Raises:
ValueError – If connector type is unknown or config is invalid
ConnectionError – If connection fails
- Return type:
Example
>>> config = { >>> 'type': 'epics', >>> 'connector': { >>> 'epics': { >>> 'timeout': 5.0, >>> 'gateways': {'read_only': {...}} >>> } >>> } >>> } >>> connector = await ConnectorFactory.create_control_system_connector(config)
- async classmethod create_archiver_connector(config=None)[source]#
Create and configure an archiver connector.
- Parameters:
config (dict[str, Any]) – Archiver configuration dict with keys: - type: Connector type (e.g., ‘epics_archiver’, ‘mock_archiver’) - [type]: Dict with type-specific configs If None, loads from global config
- Returns:
Initialized and connected ArchiverConnector
- Raises:
ValueError – If connector type is unknown or config is invalid
ConnectionError – If connection fails
- Return type:
Example
>>> config = { >>> 'type': 'epics_archiver', >>> 'epics_archiver': { >>> 'url': 'https://archiver.als.lbl.gov:8443', >>> 'timeout': 60 >>> } >>> } >>> connector = await ConnectorFactory.create_archiver_connector(config)
The factory provides centralized creation and configuration of connectors with plugin-style registration.
Registry Integration#
Connectors can be registered through the Osprey registry system for unified component management.
- class osprey.registry.ConnectorRegistration(name, connector_type, module_path, class_name, description)[source]#
Bases:
objectRegistration metadata for control system and archiver connectors.
Defines the metadata required for lazy loading of connector classes. Connectors are registered with the ConnectorFactory during registry initialization, providing unified management of all framework components.
- Parameters:
name (str) – Unique connector name (e.g., ‘epics’, ‘tango’, ‘mock’)
connector_type (str) – Type of connector (‘control_system’ or ‘archiver’)
module_path (str) – Python module path for lazy import
class_name (str) – Connector class name within the module
description (str) – Human-readable description
Examples
Control system connector registration:
>>> ConnectorRegistration( ... name="labview", ... connector_type="control_system", ... module_path="my_app.connectors.labview_connector", ... class_name="LabVIEWConnector", ... description="LabVIEW Web Services connector for NI systems" ... )
Archiver connector registration:
>>> ConnectorRegistration( ... name="custom_archiver", ... connector_type="archiver", ... module_path="my_app.connectors.custom_archiver", ... class_name="CustomArchiverConnector", ... description="Custom facility archiver connector" ... )
Note
The connector classes are registered with ConnectorFactory during registry initialization, enabling lazy loading while maintaining the factory pattern for runtime connector creation.
See also
osprey.connectors.factory.ConnectorFactory: Runtime connector factoryosprey.connectors.control_system.base.ControlSystemConnector: Base class for control system connectorsosprey.connectors.archiver.base.ArchiverConnector: Base class for archiver connectors- name: str#
- connector_type: str#
- module_path: str#
- class_name: str#
- description: str#
- __init__(name, connector_type, module_path, class_name, description)#
Registration dataclass for control system and archiver connectors. Used to register connectors through the registry system, providing lazy loading and unified management alongside other framework components.
Usage Example:
from osprey.registry import ConnectorRegistration, extend_framework_registry
def get_registry_config(self):
return extend_framework_registry(
connectors=[
ConnectorRegistration(
name="labview",
connector_type="control_system",
module_path="my_app.connectors.labview_connector",
class_name="LabVIEWConnector",
description="LabVIEW Web Services connector for NI systems"
),
ConnectorRegistration(
name="tango",
connector_type="control_system",
module_path="my_app.connectors.tango_connector",
class_name="TangoConnector",
description="Tango control system connector"
),
ConnectorRegistration(
name="tango_archiver",
connector_type="archiver",
module_path="my_app.connectors.tango_archiver",
class_name="TangoArchiverConnector",
description="Tango archiver connector"
)
],
capabilities=[...],
context_classes=[...]
)
Control System Interfaces#
Base Classes#
- class osprey.connectors.control_system.base.ControlSystemConnector[source]#
Bases:
ABCAbstract base class for control system connectors.
Implementations provide interfaces to different control systems (EPICS, LabVIEW, Tango, Mock, etc.) using a unified API.
Example
>>> connector = await ConnectorFactory.create_control_system_connector() >>> try: >>> channel_value = await connector.read_channel('BEAM:CURRENT') >>> print(f"Beam current: {channel_value.value} {channel_value.metadata.units}") >>> finally: >>> await connector.disconnect()
- abstractmethod async connect(config)[source]#
Establish connection to control system.
- Parameters:
config (dict[str, Any]) – Control system-specific configuration
- Raises:
ConnectionError – If connection cannot be established
- abstractmethod async disconnect()[source]#
Close connection to control system and cleanup resources.
- abstractmethod async read_channel(channel_address, timeout=None)[source]#
Read current value of a channel.
- Parameters:
channel_address (str) – Address/name of the channel
timeout (float | None) – Optional timeout in seconds
- Returns:
ChannelValue with current value, timestamp, and metadata
- Raises:
ConnectionError – If channel cannot be reached
TimeoutError – If operation times out
ValueError – If channel address is invalid
- Return type:
- abstractmethod async write_channel(channel_address, value, timeout=None, verification_level='callback', tolerance=None)[source]#
Write value to a channel with configurable verification.
- Parameters:
channel_address (str) – Address/name of the channel
value (Any) – Value to write
timeout (float | None) – Optional timeout in seconds
verification_level (str) – Verification strategy (“none”, “callback”, “readback”)
tolerance (float | None) – Absolute tolerance for readback verification (only used if verification_level=”readback”)
- Returns:
ChannelWriteResult with write status and verification details
- Raises:
ConnectionError – If channel cannot be reached
TimeoutError – If operation times out
ValueError – If value is invalid for this channel
PermissionError – If write access is not allowed
- Return type:
Note
The verification_level determines what confirmation is provided: - “none”: Fast write, no verification (success=True if command sent) - “callback”: Control system confirms processing (e.g., EPICS IOC callback) - “readback”: Full verification with readback value comparison
Different control systems may interpret these levels differently based on their native capabilities.
- abstractmethod async read_multiple_channels(channel_addresses, timeout=None)[source]#
Read multiple channels efficiently (can be optimized per control system).
- Parameters:
channel_addresses (list[str]) – List of channel addresses to read
timeout (float | None) – Optional timeout in seconds
- Returns:
Dictionary mapping channel address to ChannelValue (May exclude channels that failed to read)
- Return type:
dict[str, ChannelValue]
- abstractmethod async subscribe(channel_address, callback)[source]#
Subscribe to channel changes.
- Parameters:
channel_address (str) – Address/name of the channel
callback (Callable[[ChannelValue], None]) – Function called when value changes (receives ChannelValue)
- Returns:
Subscription ID for later unsubscribe
- Return type:
str
- abstractmethod async unsubscribe(subscription_id)[source]#
Cancel subscription to channel changes.
- Parameters:
subscription_id (str) – Subscription ID returned by subscribe()
- abstractmethod async get_metadata(channel_address)[source]#
Get metadata about a channel.
- Parameters:
channel_address (str) – Address/name of the channel
- Returns:
ChannelMetadata with units, limits, description, etc.
- Raises:
ConnectionError – If channel cannot be reached
- Return type:
- abstractmethod async validate_channel(channel_address)[source]#
Check if channel exists and is accessible.
- Parameters:
channel_address (str) – Address/name of the channel
- Returns:
True if channel is valid and accessible
- Return type:
bool
- async read_pv(pv_address, timeout=None)[source]#
Read current value of a PV/channel.
Deprecated since version 0.9.5: Use
read_channel()instead. The term “PV” is EPICS-specific; “channel” is control-system agnostic.- Return type:
- async write_pv(pv_address, value, timeout=None, verification_level='callback', tolerance=None)[source]#
Write value to a PV/channel.
Deprecated since version 0.9.5: Use
write_channel()instead. The term “PV” is EPICS-specific; “channel” is control-system agnostic.- Return type:
- async read_multiple_pvs(pv_addresses, timeout=None)[source]#
Read multiple PVs/channels.
Deprecated since version 0.9.5: Use
read_multiple_channels()instead.- Return type:
dict[str, ChannelValue]
- async validate_pv(pv_address)[source]#
Check if PV/channel exists and is accessible.
Deprecated since version 0.9.5: Use
validate_channel()instead.- Return type:
bool
Abstract base class defining the contract for all control system connectors (EPICS, LabVIEW, Tango, Mock, etc.).
Data Models#
Read Operation Models#
- class osprey.connectors.control_system.base.ChannelValue(value, timestamp, metadata=<factory>)[source]#
Bases:
objectValue of a control system channel with metadata.
- value: Any#
- timestamp: datetime#
- metadata: ChannelMetadata#
- __init__(value, timestamp, metadata=<factory>)#
Result container for channel reads with value, timestamp, and metadata.
- class osprey.connectors.control_system.base.ChannelMetadata(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)[source]#
Bases:
objectMetadata about a control system channel.
- units: str = ''#
- precision: int | None = None#
- alarm_status: str | None = None#
- timestamp: datetime | None = None#
- description: str | None = None#
- min_value: float | None = None#
- max_value: float | None = None#
- raw_metadata: dict[str, Any] | None#
- __init__(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)#
Metadata about a control system channel (units, precision, alarms, limits, etc.).
Backward Compatibility:
Deprecated since version 0.9.5: The classes PVValue and PVMetadata are deprecated and aliased to ChannelValue
and ChannelMetadata respectively. The “PV” terminology is EPICS-specific; “channel”
is control-system agnostic and supports any control system (EPICS, Tango, LabVIEW, etc.).
Write Operation Models#
- class osprey.connectors.control_system.base.ChannelWriteResult(channel_address, value_written, success, verification=None, error_message=None)[source]#
Bases:
objectResult from a channel write operation with optional verification.
This is the control-system-agnostic result type returned by all connectors. Provides detailed information about write success and verification status.
- channel_address: str#
- value_written: Any#
- success: bool#
- verification: WriteVerification | None = None#
- error_message: str | None = None#
- __init__(channel_address, value_written, success, verification=None, error_message=None)#
Result container for channel write operations with success status, written value, and optional verification.
Fields:
success(bool): Whether the write operation succeededwritten_value(Any): The value that was written to the channelverification(Optional[WriteVerification]): Verification result if verification was requested
Example:
# Automatic verification (uses per-channel or global config)
result = await connector.write_channel('BEAM:SETPOINT', 450.0)
if result.success and result.verification and result.verification.verified:
print(f"Write verified ({result.verification.level}): {result.written_value}")
elif result.success:
print(f"Write succeeded but not verified")
else:
print(f"Write failed")
# Manual override (optional)
result = await connector.write_channel(
'BEAM:SETPOINT',
450.0,
verification_level='readback', # Override auto-config
tolerance=0.01
)
- class osprey.connectors.control_system.base.WriteVerification(level, verified, readback_value=None, tolerance_used=None, notes=None)[source]#
Bases:
objectVerification result from a channel write operation.
Different control systems provide different levels of verification: - “none”: No verification performed (fast write) - “callback”: Control system confirmed request processing (e.g., EPICS IOC callback) - “readback”: Full verification with readback comparison
- level: str#
- verified: bool#
- readback_value: float | None = None#
- tolerance_used: float | None = None#
- notes: str | None = None#
- __init__(level, verified, readback_value=None, tolerance_used=None, notes=None)#
Verification result for channel write operations.
Fields:
verification_level(str): Level of verification performed (‘none’, ‘callback’, ‘readback’)verified(bool): Whether the write was successfully verifiedreadback_value(Optional[Any]): The value read back from the channel (readback mode only)tolerance_check(Optional[bool]): Whether readback matched within tolerance (readback mode only)
Verification Levels:
‘none’: No verification performed (
verified=False)‘callback’: Uses Channel Access callback to confirm write (EPICS only)
‘readback’: Reads back the value and compares with tolerance
Example:
# Automatic verification (connector determines level from config)
result = await connector.write_channel('BEAM:CURRENT', 400.0)
if result.verification and result.verification.verified:
print(f"Verification: {result.verification.level}")
if result.verification.readback_value is not None:
print(f"Readback: {result.verification.readback_value}")
else:
print("Verification failed")
Built-in Implementations#
- class osprey.connectors.control_system.mock_connector.MockConnector[source]#
Bases:
ControlSystemConnectorMock control system connector for development and testing.
This connector simulates a control system without requiring real hardware. It generates realistic synthetic data for any PV name, making it ideal for R&D and development when you don’t have access to the control room.
Features: - Accepts any PV name - Generates realistic initial values based on PV naming conventions - Adds configurable noise to simulate real measurements - Maintains state between reads and writes - Simulates readback PVs (e.g., :SP -> :RB)
Example
>>> config = { >>> 'response_delay_ms': 10, >>> 'noise_level': 0.01, >>> 'enable_writes': True >>> } >>> connector = MockConnector() >>> await connector.connect(config) >>> value = await connector.read_pv('BEAM:CURRENT') >>> print(f"Beam current: {value.value} {value.metadata.units}")
- async read_channel(channel_address, timeout=None)[source]#
Read channel - generates realistic value if not cached.
- Parameters:
channel_address (str) – Any channel name (mock accepts all names)
timeout (float | None) – Ignored for mock connector
- Returns:
ChannelValue with synthetic data
- Return type:
- async write_channel(channel_address, value, timeout=None, verification_level=None, tolerance=None)[source]#
Write channel with automatic limits validation and verification.
The connector automatically: 1. Validates limits (min/max/step/writable) if limits checking enabled 2. Determines verification level from per-channel or global config 3. Executes write with appropriate verification
- Parameters:
channel_address (str) – Any channel name
value (Any) – Value to write
timeout (float | None) – Ignored for mock connector
verification_level (str | None) – Optional override for verification level (auto-determined if None)
tolerance (float | None) – Optional override for tolerance (auto-calculated if None)
- Returns:
ChannelWriteResult with write status and verification details
- Raises:
ChannelLimitsViolationError – If limits validation fails (when enabled)
- Return type:
- async read_multiple_channels(channel_addresses, timeout=None)[source]#
Read multiple channels concurrently.
- Return type:
dict[str, ChannelValue]
- async subscribe(channel_address, callback)[source]#
Subscribe to channel changes.
Note: Mock connector only triggers callbacks on write_channel calls.
- Return type:
str
Development connector that accepts any PV names and generates realistic simulated data. Ideal for R&D when you don’t have control room access.
Key Features:
Accepts any PV name (no real control system required)
Configurable response delays and noise levels
Realistic units, timestamps, and metadata
Optional write operation support
- class osprey.connectors.control_system.epics_connector.EPICSConnector[source]#
Bases:
ControlSystemConnectorEPICS control system connector using pyepics.
Provides read/write access to EPICS Process Variables through Channel Access protocol. Supports gateway configuration for remote access and read-only/write-access gateways.
Example
Direct gateway connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘cagw-alsdmz.als.lbl.gov’, >>> ‘port’: 5064 >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)
SSH tunnel connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘localhost’, >>> ‘port’: 5074, >>> ‘use_name_server’: True >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)
- async read_channel(channel_address, timeout=None)[source]#
Read current value from EPICS channel.
- Parameters:
channel_address (str) – EPICS channel address (e.g., ‘BEAM:CURRENT’)
timeout (float | None) – Timeout in seconds (uses default if None)
- Returns:
ChannelValue with current value, timestamp, and metadata
- Raises:
ConnectionError – If channel cannot be connected
TimeoutError – If operation times out
- Return type:
- async write_channel(channel_address, value, timeout=None, verification_level=None, tolerance=None)[source]#
Write value to EPICS channel with automatic limits validation and verification.
The connector automatically: 1. Validates limits (min/max/step/writable) if limits checking enabled 2. Determines verification level from per-channel or global config 3. Executes write with appropriate verification
- Parameters:
channel_address (str) – EPICS channel address
value (Any) – Value to write
timeout (float | None) – Timeout in seconds
verification_level (str | None) – Optional override for verification level (auto-determined if None)
tolerance (float | None) – Optional override for tolerance (auto-calculated if None)
- Returns:
ChannelWriteResult with write status and verification details
- Raises:
ConnectionError – If channel cannot be connected
TimeoutError – If operation times out
ChannelLimitsViolationError – If limits validation fails (when enabled)
- Return type:
- async read_multiple_channels(channel_addresses, timeout=None)[source]#
Read multiple channels concurrently.
- Return type:
dict[str, ChannelValue]
- async subscribe(channel_address, callback)[source]#
Subscribe to channel value changes.
- Parameters:
channel_address (str) – EPICS channel address
callback (Callable[[ChannelValue], None]) – Function to call when value changes
- Returns:
Subscription ID for later unsubscription
- Return type:
str
Production EPICS Channel Access connector using pyepics library. Supports gateway configuration for secure access.
Key Features:
EPICS Channel Access protocol
Read-only and read-write gateway support
Configurable timeouts and retry logic
Full metadata support (units, precision, alarms, limits)
Requirements:
pyepicslibrary:pip install pyepicsAccess to EPICS gateway or IOCs
Archiver Interfaces#
Base Classes#
- class osprey.connectors.archiver.base.ArchiverConnector[source]#
Bases:
ABCAbstract base class for archiver connectors.
Implementations provide interfaces to different archiver systems using a unified API that returns pandas DataFrames.
Example
>>> connector = await ConnectorFactory.create_archiver_connector() >>> try: >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> ) >>> print(df.head()) >>> finally: >>> await connector.disconnect()
- abstractmethod async connect(config)[source]#
Establish connection to archiver.
- Parameters:
config (dict[str, Any]) – Archiver-specific configuration
- Raises:
ConnectionError – If connection cannot be established
- abstractmethod async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Retrieve historical data for PVs.
- Parameters:
pv_list (list[str]) – List of PV names to retrieve
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision in milliseconds (for downsampling)
timeout (int | None) – Optional timeout in seconds
- Returns:
DataFrame with datetime index and PV columns Each column contains the time series for one PV
- Raises:
ConnectionError – If archiver cannot be reached
TimeoutError – If operation times out
ValueError – If time range or PV names are invalid
- Return type:
pandas.DataFrame
- abstractmethod async get_metadata(pv_name)[source]#
Get archiving metadata for a PV.
- Parameters:
pv_name (str) – Name of the process variable
- Returns:
ArchiverMetadata with archiving information
- Raises:
ConnectionError – If archiver cannot be reached
ValueError – If PV name is invalid
- Return type:
ArchiverMetadata
Abstract base class defining the contract for all archiver connectors.
Container for historical time series data with timestamps and values.
Built-in Implementations#
- class osprey.connectors.archiver.mock_archiver_connector.MockArchiverConnector[source]#
Bases:
ArchiverConnectorMock archiver for development - generates synthetic time-series data.
This connector simulates an archiver system without requiring real archiver access. It generates realistic time-series data for any PV name.
Features: - Accepts any PV names - Generates realistic time series with trends and noise - Configurable sampling rate and noise level - Returns pandas DataFrames matching real archiver format
Example
>>> config = { >>> 'sample_rate_hz': 1.0, >>> 'noise_level': 0.01 >>> } >>> connector = MockArchiverConnector() >>> await connector.connect(config) >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> )
- async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Generate synthetic historical data.
- Parameters:
pv_list (list[str]) – List of PV names (all accepted)
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision (affects downsampling)
timeout (int | None) – Ignored for mock archiver
- Returns:
DataFrame with datetime index and columns for each PV
- Return type:
pandas.DataFrame
Development archiver that generates synthetic historical data. Ideal for R&D when you don’t have archiver access.
Key Features:
Generates realistic time series with trends and noise
Configurable retention period and sample rates
Works with any PV names
Consistent with mock control system connector
- class osprey.connectors.archiver.epics_archiver_connector.EPICSArchiverConnector[source]#
Bases:
ArchiverConnectorEPICS Archiver Appliance connector using archivertools.
Provides access to historical PV data from EPICS Archiver Appliance via the archivertools Python library.
Example
>>> config = { >>> 'url': 'https://archiver.als.lbl.gov:8443', >>> 'timeout': 60 >>> } >>> connector = EPICSArchiverConnector() >>> await connector.connect(config) >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> )
- async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Retrieve historical data from EPICS archiver.
- Parameters:
pv_list (list[str]) – List of PV names to retrieve
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision in milliseconds (for downsampling)
timeout (int | None) – Optional timeout in seconds
- Returns:
DataFrame with datetime index and PV columns
- Raises:
RuntimeError – If archiver not connected
TimeoutError – If operation times out
ConnectionError – If archiver cannot be reached
ValueError – If data format is unexpected
- Return type:
pandas.DataFrame
- async get_metadata(pv_name)[source]#
Get archiving metadata for a PV.
Note: archivertools doesn’t expose metadata API directly, so this returns basic information.
- Parameters:
pv_name (str) – Name of the process variable
- Returns:
ArchiverMetadata with basic archiving information
- Return type:
ArchiverMetadata
- async check_availability(pv_names)[source]#
Check which PVs are archived.
Note: Basic implementation that assumes all PVs are archived. Could be enhanced with actual archiver API calls.
- Parameters:
pv_names (list[str]) – List of PV names to check
- Returns:
Dictionary mapping PV name to availability status
- Return type:
dict[str, bool]
Production connector for EPICS Archiver Appliance using archivertools library.
Key Features:
EPICS Archiver Appliance integration
Efficient bulk data retrieval
Configurable precision and time ranges
Connection pooling for performance
Requirements:
archivertoolslibrary:pip install archivertoolsAccess to EPICS Archiver Appliance URL
Pattern Detection#
Static code analysis for detecting control system operations in generated code. Critical security layer that catches both approved API usage and circumvention attempts.
Analyzes Python code using framework-standard or custom patterns to detect control system operations. The framework provides comprehensive security-focused patterns by default - no configuration needed.
Security Purpose: Detects both approved osprey.runtime API usage AND direct control system library
calls that would bypass connector safety features (limits checking, verification, approval workflows).
Example:
from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations
# Detects approved API
code_approved = "write_channel('BEAM:CURRENT', 500)"
result = detect_control_system_operations(code_approved)
# result['has_writes'] == True
# Also detects circumvention attempts
code_circumvent = "epics.caput('BEAM:CURRENT', 500)"
result = detect_control_system_operations(code_circumvent)
# result['has_writes'] == True # Caught by security layer!
Returns framework-standard security-focused patterns. These patterns detect:
✅ Approved
osprey.runtimeAPI (with all safety features)🔒 EPICS direct calls (
epics.caput,PV().put- bypasses safety)🔒 Tango direct calls (
DeviceProxy().write_attribute- bypasses safety)🔒 LabVIEW integration patterns (bypasses safety)
🔒 Direct connector access (advanced use)
Configuration Schema#
Control System Configuration#
Control system connector configuration in config.yml:
control_system:
type: mock | epics | labview | tango | custom
# Pattern detection is automatic - framework provides defaults
# Optional: Override for custom workflows (rarely needed)
# patterns:
# write:
# - 'custom_write_function\('
# read:
# - 'custom_read_function\('
# Type-specific connector configurations
connector:
mock:
response_delay_ms: 10
noise_level: 0.01
enable_writes: true
epics:
gateways:
read_only:
address: cagw.facility.edu
port: 5064
use_name_server: false # Use EPICS_CA_NAME_SERVERS vs CA_ADDR_LIST (default: false)
read_write:
address: cagw-rw.facility.edu
port: 5065
use_name_server: false
timeout: 5.0
retry_count: 3
retry_delay: 0.5
Archiver Configuration#
Archiver connector configuration in config.yml:
archiver:
type: mock_archiver | epics_archiver | custom_archiver
# Mock archiver uses sensible defaults
mock_archiver:
sample_rate_hz: 1.0
noise_level: 0.01
epics_archiver:
url: https://archiver.facility.edu:8443
timeout: 60
max_retries: 3
verify_ssl: true
pool_connections: 10
pool_maxsize: 20
Usage Examples#
Basic Usage#
Create and use a connector from global configuration:
from osprey.connectors.factory import ConnectorFactory
# Create connector from config.yml
connector = await ConnectorFactory.create_control_system_connector()
try:
# Read a channel
result = await connector.read_channel('BEAM:CURRENT')
print(f"Current: {result.value} {result.metadata.units}")
# Read multiple channels
results = await connector.read_multiple_channels([
'BEAM:CURRENT',
'BEAM:LIFETIME',
'BEAM:ENERGY'
])
# Get metadata
metadata = await connector.get_metadata('BEAM:CURRENT')
print(f"Units: {metadata.units}, Range: {metadata.min_value}-{metadata.max_value}")
finally:
await connector.disconnect()
Usage in Generated Python Code#
When the Python execution service generates code that needs to interact with control systems, it uses the osprey.runtime module instead of direct connector imports. This provides a simple, synchronous API that works with any configured control system:
# In generated Python code
from osprey.runtime import write_channel, read_channel
# Read from control system (synchronous, like EPICS caget)
current = read_channel("BEAM:CURRENT")
print(f"Current: {current} mA")
# Write to control system (synchronous, like EPICS caput)
write_channel("MAGNET:SETPOINT", 5.0)
Key Benefits:
Control-System Agnostic: Same code works with EPICS, Mock, LabVIEW, or any registered connector
Automatic Configuration: Uses control system settings from execution context (reproducible notebooks)
Safety Integration: All boundary checking, limits validation, and approval workflows happen automatically
Simple API: Synchronous functions (async handled internally)
The runtime module is automatically configured by the execution wrapper and uses the same connector configuration shown above.
See also
- Part 3: Integration & Deployment
Complete tutorial on how generated code interacts with control systems
Custom Configuration#
Create connector with inline configuration:
# Mock connector with custom settings
config = {
'type': 'mock',
'connector': {
'mock': {
'response_delay_ms': 5,
'noise_level': 0.02
}
}
}
connector = await ConnectorFactory.create_control_system_connector(config)
# EPICS connector with specific gateway
config = {
'type': 'epics',
'connector': {
'epics': {
'gateways': {
'read_only': {
'address': 'cagw.als.lbl.gov',
'port': 5064
}
},
'timeout': 3.0
}
}
}
connector = await ConnectorFactory.create_control_system_connector(config)
Archiver Usage#
Retrieve historical data:
from osprey.connectors.factory import ConnectorFactory
from datetime import datetime, timedelta
# Create archiver connector
connector = await ConnectorFactory.create_archiver_connector()
try:
# Define time range
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
# Retrieve data for multiple PVs
data = await connector.get_data(
pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'],
start_date=start_time,
end_date=end_time,
precision_ms=1000 # 1 second precision
)
# Process results
for pv_name, pv_data in data.items():
print(f"{pv_name}: {len(pv_data.timestamps)} data points")
finally:
await connector.disconnect()
Pattern Detection Usage#
Detect control system operations in generated code:
from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations
code = """
# Read beam current
current = epics.caget('BEAM:CURRENT')
# Adjust setpoint if needed
if current < 400:
epics.caput('BEAM:SETPOINT', 420.0)
"""
# Detect operations
result = detect_control_system_operations(code)
if result['has_writes']:
print("⚠️ Code performs write operations - requires approval")
if result['has_reads']:
print("✓ Code performs read operations")
print(f"Control system: {result['control_system_type']}")
print(f"Write patterns detected: {result['detected_patterns']['writes']}")
print(f"Read patterns detected: {result['detected_patterns']['reads']}")
Custom Connector Registration#
Custom connectors are registered through the Osprey registry system:
# In your application's registry.py
from osprey.registry import ConnectorRegistration, extend_framework_registry
class MyAppRegistryProvider(RegistryConfigProvider):
def get_registry_config(self):
return extend_framework_registry(
connectors=[
# Control system connectors
ConnectorRegistration(
name="labview",
connector_type="control_system",
module_path="my_app.connectors.labview_connector",
class_name="LabVIEWConnector",
description="LabVIEW Web Services connector"
),
ConnectorRegistration(
name="tango",
connector_type="control_system",
module_path="my_app.connectors.tango_connector",
class_name="TangoConnector",
description="Tango control system connector"
),
# Archiver connectors
ConnectorRegistration(
name="tango_archiver",
connector_type="archiver",
module_path="my_app.connectors.tango_archiver",
class_name="TangoArchiverConnector",
description="Tango archiver connector"
),
],
capabilities=[...],
context_classes=[...]
)
After registration, connectors are available via configuration:
# config.yml
control_system:
type: labview # or tango, epics, mock
connector:
labview:
base_url: "http://labview-server:8080"
api_key: "your-api-key"
tango:
device_name: "tango://host:10000/sys/dev/1"
archiver:
type: tango_archiver
tango_archiver:
url: "https://archiver.facility.edu"
See also
- Control System Integration
Complete implementation guide with step-by-step examples
- Part 1: Getting Started
See connectors in action in the Control Assistant tutorial
- Human Approval
How pattern detection integrates with approval workflows
- Python Execution
Pattern detection in secure Python code execution