Control System Connectors#

Pluggable connector abstraction for control systems and archivers with mock and production implementations. Enables development without hardware access and seamless migration to production by changing configuration.

Note

For implementation guides and examples, see Control System Integration.

Factory Classes#

class osprey.connectors.factory.ConnectorFactory[source]#

Bases: object

Factory for creating control system and archiver connectors.

Provides centralized management of available connectors. Connectors are registered automatically through the Osprey registry system during framework initialization.

Example

>>> # Using default config
>>> cs_connector = await ConnectorFactory.create_control_system_connector()
>>>
>>> # Using custom config
>>> config = {'type': 'mock', 'connector': {'mock': {...}}}
>>> cs_connector = await ConnectorFactory.create_control_system_connector(config)
classmethod register_control_system(name, connector_class)[source]#

Register a control system connector.

Parameters:
  • name (str) – Unique name for the connector (e.g., ‘epics’, ‘tango’, ‘mock’)

  • connector_class (type[ControlSystemConnector]) – Connector class implementing ControlSystemConnector

classmethod register_archiver(name, connector_class)[source]#

Register an archiver connector.

Parameters:
  • name (str) – Unique name for the connector (e.g., ‘epics_archiver’, ‘mock_archiver’)

  • connector_class (type[ArchiverConnector]) – Connector class implementing ArchiverConnector

async classmethod create_control_system_connector(config=None)[source]#

Create and configure a control system connector.

Parameters:

config (dict[str, Any]) – Control system configuration dict with keys: - type: Connector type (e.g., ‘epics’, ‘mock’) - connector: Dict with connector-specific configs If None, loads from global config

Returns:

Initialized and connected ControlSystemConnector

Raises:
  • ValueError – If connector type is unknown or config is invalid

  • ConnectionError – If connection fails

Return type:

ControlSystemConnector

Example

>>> config = {
>>>     'type': 'epics',
>>>     'connector': {
>>>         'epics': {
>>>             'timeout': 5.0,
>>>             'gateways': {'read_only': {...}}
>>>         }
>>>     }
>>> }
>>> connector = await ConnectorFactory.create_control_system_connector(config)
async classmethod create_archiver_connector(config=None)[source]#

Create and configure an archiver connector.

Parameters:

config (dict[str, Any]) – Archiver configuration dict with keys: - type: Connector type (e.g., ‘epics_archiver’, ‘mock_archiver’) - [type]: Dict with type-specific configs If None, loads from global config

Returns:

Initialized and connected ArchiverConnector

Raises:
  • ValueError – If connector type is unknown or config is invalid

  • ConnectionError – If connection fails

Return type:

ArchiverConnector

Example

>>> config = {
>>>     'type': 'epics_archiver',
>>>     'epics_archiver': {
>>>         'url': 'https://archiver.als.lbl.gov:8443',
>>>         'timeout': 60
>>>     }
>>> }
>>> connector = await ConnectorFactory.create_archiver_connector(config)
classmethod list_control_systems()[source]#

List available control system connector types.

Return type:

list

classmethod list_archivers()[source]#

List available archiver connector types.

Return type:

list

The factory provides centralized creation and configuration of connectors with plugin-style registration.

Registry Integration#

Connectors can be registered through the Osprey registry system for unified component management.

class osprey.registry.ConnectorRegistration(name, connector_type, module_path, class_name, description)[source]#

Bases: object

Registration metadata for control system and archiver connectors.

Defines the metadata required for lazy loading of connector classes. Connectors are registered with the ConnectorFactory during registry initialization, providing unified management of all framework components.

Parameters:
  • name (str) – Unique connector name (e.g., ‘epics’, ‘tango’, ‘mock’)

  • connector_type (str) – Type of connector (‘control_system’ or ‘archiver’)

  • module_path (str) – Python module path for lazy import

  • class_name (str) – Connector class name within the module

  • description (str) – Human-readable description

Examples

Control system connector registration:

>>> ConnectorRegistration(
...     name="labview",
...     connector_type="control_system",
...     module_path="my_app.connectors.labview_connector",
...     class_name="LabVIEWConnector",
...     description="LabVIEW Web Services connector for NI systems"
... )

Archiver connector registration:

>>> ConnectorRegistration(
...     name="custom_archiver",
...     connector_type="archiver",
...     module_path="my_app.connectors.custom_archiver",
...     class_name="CustomArchiverConnector",
...     description="Custom facility archiver connector"
... )

Note

The connector classes are registered with ConnectorFactory during registry initialization, enabling lazy loading while maintaining the factory pattern for runtime connector creation.

See also

osprey.connectors.factory.ConnectorFactory : Runtime connector factory osprey.connectors.control_system.base.ControlSystemConnector : Base class for control system connectors osprey.connectors.archiver.base.ArchiverConnector : Base class for archiver connectors

name: str#
connector_type: str#
module_path: str#
class_name: str#
description: str#
__init__(name, connector_type, module_path, class_name, description)#

Registration dataclass for control system and archiver connectors. Used to register connectors through the registry system, providing lazy loading and unified management alongside other framework components.

Usage Example:

from osprey.registry import ConnectorRegistration, extend_framework_registry

def get_registry_config(self):
    return extend_framework_registry(
        connectors=[
            ConnectorRegistration(
                name="labview",
                connector_type="control_system",
                module_path="my_app.connectors.labview_connector",
                class_name="LabVIEWConnector",
                description="LabVIEW Web Services connector for NI systems"
            ),
            ConnectorRegistration(
                name="tango",
                connector_type="control_system",
                module_path="my_app.connectors.tango_connector",
                class_name="TangoConnector",
                description="Tango control system connector"
            ),
            ConnectorRegistration(
                name="tango_archiver",
                connector_type="archiver",
                module_path="my_app.connectors.tango_archiver",
                class_name="TangoArchiverConnector",
                description="Tango archiver connector"
            )
        ],
        capabilities=[...],
        context_classes=[...]
    )

Control System Interfaces#

Base Classes#

class osprey.connectors.control_system.base.ControlSystemConnector[source]#

Bases: ABC

Abstract base class for control system connectors.

Implementations provide interfaces to different control systems (EPICS, LabVIEW, Tango, Mock, etc.) using a unified API.

Example

>>> connector = await ConnectorFactory.create_control_system_connector()
>>> try:
>>>     pv_value = await connector.read_pv('BEAM:CURRENT')
>>>     print(f"Beam current: {pv_value.value} {pv_value.metadata.units}")
>>> finally:
>>>     await connector.disconnect()
abstractmethod async connect(config)[source]#

Establish connection to control system.

Parameters:

config (dict[str, Any]) – Control system-specific configuration

Raises:

ConnectionError – If connection cannot be established

abstractmethod async disconnect()[source]#

Close connection to control system and cleanup resources.

abstractmethod async read_pv(pv_address, timeout=None)[source]#

Read current value of a process variable.

Parameters:
  • pv_address (str) – Address/name of the process variable

  • timeout (float | None) – Optional timeout in seconds

Returns:

PVValue with current value, timestamp, and metadata

Raises:
  • ConnectionError – If PV cannot be reached

  • TimeoutError – If operation times out

  • ValueError – If PV address is invalid

Return type:

PVValue

abstractmethod async write_pv(pv_address, value, timeout=None)[source]#

Write value to a process variable.

Parameters:
  • pv_address (str) – Address/name of the process variable

  • value (Any) – Value to write

  • timeout (float | None) – Optional timeout in seconds

Returns:

True if write was successful

Raises:
  • ConnectionError – If PV cannot be reached

  • TimeoutError – If operation times out

  • ValueError – If value is invalid for this PV

  • PermissionError – If write access is not allowed

Return type:

bool

abstractmethod async read_multiple_pvs(pv_addresses, timeout=None)[source]#

Read multiple PVs efficiently (can be optimized per control system).

Parameters:
  • pv_addresses (list[str]) – List of PV addresses to read

  • timeout (float | None) – Optional timeout in seconds

Returns:

Dictionary mapping PV address to PVValue (May exclude PVs that failed to read)

Return type:

dict[str, PVValue]

abstractmethod async subscribe(pv_address, callback)[source]#

Subscribe to PV changes.

Parameters:
  • pv_address (str) – Address/name of the process variable

  • callback (Callable[[PVValue], None]) – Function to call when PV value changes

Returns:

Subscription ID for later unsubscription

Raises:

ConnectionError – If PV cannot be reached

Return type:

str

abstractmethod async unsubscribe(subscription_id)[source]#

Unsubscribe from PV changes.

Parameters:

subscription_id (str) – Subscription ID returned by subscribe()

abstractmethod async get_metadata(pv_address)[source]#

Get metadata about a PV.

Parameters:

pv_address (str) – Address/name of the process variable

Returns:

PVMetadata with units, limits, description, etc.

Raises:

ConnectionError – If PV cannot be reached

Return type:

PVMetadata

abstractmethod async validate_pv(pv_address)[source]#

Check if PV exists and is accessible.

Parameters:

pv_address (str) – Address/name of the process variable

Returns:

True if PV is valid and accessible

Return type:

bool

Abstract base class defining the contract for all control system connectors (EPICS, LabVIEW, Tango, Mock, etc.).

Data Models#

class osprey.connectors.control_system.base.PVValue(value, timestamp, metadata=<factory>)[source]#

Bases: object

Value of a process variable with metadata.

value: Any#
timestamp: datetime#
metadata: PVMetadata#
__init__(value, timestamp, metadata=<factory>)#

Result container for process variable reads with value, timestamp, and metadata.

class osprey.connectors.control_system.base.PVMetadata(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)[source]#

Bases: object

Metadata about a process variable/attribute.

units: str = ''#
precision: int | None = None#
alarm_status: str | None = None#
timestamp: datetime | None = None#
description: str | None = None#
min_value: float | None = None#
max_value: float | None = None#
raw_metadata: dict[str, Any] | None#
__post_init__()[source]#

Ensure raw_metadata is a dict.

__init__(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)#

Metadata about a process variable (units, precision, alarms, limits, etc.).

Built-in Implementations#

class osprey.connectors.control_system.mock_connector.MockConnector[source]#

Bases: ControlSystemConnector

Mock control system connector for development and testing.

This connector simulates a control system without requiring real hardware. It generates realistic synthetic data for any PV name, making it ideal for R&D and development when you don’t have access to the control room.

Features: - Accepts any PV name - Generates realistic initial values based on PV naming conventions - Adds configurable noise to simulate real measurements - Maintains state between reads and writes - Simulates readback PVs (e.g., :SP -> :RB)

Example

>>> config = {
>>>     'response_delay_ms': 10,
>>>     'noise_level': 0.01,
>>>     'enable_writes': True
>>> }
>>> connector = MockConnector()
>>> await connector.connect(config)
>>> value = await connector.read_pv('BEAM:CURRENT')
>>> print(f"Beam current: {value.value} {value.metadata.units}")
__init__()[source]#
async read_pv(pv_address, timeout=None)[source]#

Read PV - generates realistic value if not cached.

Parameters:
  • pv_address (str) – Any PV name (mock accepts all names)

  • timeout (float | None) – Ignored for mock connector

Returns:

PVValue with synthetic data

Return type:

PVValue

async write_pv(pv_address, value, timeout=None)[source]#

Write PV - updates internal state.

Parameters:
  • pv_address (str) – Any PV name

  • value (Any) – Value to write

  • timeout (float | None) – Ignored for mock connector

Returns:

True if successful, False if writes disabled

Return type:

bool

async read_multiple_pvs(pv_addresses, timeout=None)[source]#

Read multiple PVs concurrently.

Return type:

dict[str, PVValue]

async subscribe(pv_address, callback)[source]#

Subscribe to PV changes.

Note: Mock connector only triggers callbacks on write_pv calls.

Return type:

str

async unsubscribe(subscription_id)[source]#

Unsubscribe from PV changes.

async get_metadata(pv_address)[source]#

Get PV metadata (synthetic for mock).

Return type:

PVMetadata

async validate_pv(pv_address)[source]#

All PV names are valid in mock mode.

Return type:

bool

Development connector that accepts any PV names and generates realistic simulated data. Ideal for R&D when you don’t have control room access.

Key Features:

  • Accepts any PV name (no real control system required)

  • Configurable response delays and noise levels

  • Realistic units, timestamps, and metadata

  • Optional write operation support

class osprey.connectors.control_system.epics_connector.EPICSConnector[source]#

Bases: ControlSystemConnector

EPICS control system connector using pyepics.

Provides read/write access to EPICS Process Variables through Channel Access protocol. Supports gateway configuration for remote access and read-only/write-access gateways.

Example

Direct gateway connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘cagw-alsdmz.als.lbl.gov’, >>> ‘port’: 5064 >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)

SSH tunnel connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘localhost’, >>> ‘port’: 5074, >>> ‘use_name_server’: True >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)

__init__()[source]#
async read_pv(pv_address, timeout=None)[source]#

Read current value from EPICS PV.

Parameters:
  • pv_address (str) – EPICS PV address (e.g., ‘BEAM:CURRENT’)

  • timeout (float | None) – Timeout in seconds (uses default if None)

Returns:

PVValue with current value, timestamp, and metadata

Raises:
  • ConnectionError – If PV cannot be connected

  • TimeoutError – If operation times out

Return type:

PVValue

async write_pv(pv_address, value, timeout=None)[source]#

Write value to EPICS PV.

Parameters:
  • pv_address (str) – EPICS PV address

  • value (Any) – Value to write

  • timeout (float | None) – Timeout in seconds

Returns:

True if write was successful

Raises:
  • ConnectionError – If PV cannot be connected

  • TimeoutError – If operation times out

Return type:

bool

async read_multiple_pvs(pv_addresses, timeout=None)[source]#

Read multiple PVs concurrently.

Return type:

dict[str, PVValue]

async subscribe(pv_address, callback)[source]#

Subscribe to PV value changes.

Parameters:
  • pv_address (str) – EPICS PV address

  • callback (Callable[[PVValue], None]) – Function to call when value changes

Returns:

Subscription ID for later unsubscription

Return type:

str

async unsubscribe(subscription_id)[source]#

Unsubscribe from PV changes.

async get_metadata(pv_address)[source]#

Get metadata for a PV.

Return type:

PVMetadata

async validate_pv(pv_address)[source]#

Check if PV exists and is accessible.

Parameters:

pv_address (str) – EPICS PV address

Returns:

True if PV can be accessed

Return type:

bool

Production EPICS Channel Access connector using pyepics library. Supports gateway configuration for secure access.

Key Features:

  • EPICS Channel Access protocol

  • Read-only and read-write gateway support

  • Configurable timeouts and retry logic

  • Full metadata support (units, precision, alarms, limits)

Requirements:

  • pyepics library: pip install pyepics

  • Access to EPICS gateway or IOCs

Archiver Interfaces#

Base Classes#

class osprey.connectors.archiver.base.ArchiverConnector[source]#

Bases: ABC

Abstract base class for archiver connectors.

Implementations provide interfaces to different archiver systems using a unified API that returns pandas DataFrames.

Example

>>> connector = await ConnectorFactory.create_archiver_connector()
>>> try:
>>>     df = await connector.get_data(
>>>         pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'],
>>>         start_date=datetime(2024, 1, 1),
>>>         end_date=datetime(2024, 1, 2)
>>>     )
>>>     print(df.head())
>>> finally:
>>>     await connector.disconnect()
abstractmethod async connect(config)[source]#

Establish connection to archiver.

Parameters:

config (dict[str, Any]) – Archiver-specific configuration

Raises:

ConnectionError – If connection cannot be established

abstractmethod async disconnect()[source]#

Close connection to archiver and cleanup resources.

abstractmethod async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#

Retrieve historical data for PVs.

Parameters:
  • pv_list (list[str]) – List of PV names to retrieve

  • start_date (datetime) – Start of time range

  • end_date (datetime) – End of time range

  • precision_ms (int) – Time precision in milliseconds (for downsampling)

  • timeout (int | None) – Optional timeout in seconds

Returns:

DataFrame with datetime index and PV columns Each column contains the time series for one PV

Raises:
  • ConnectionError – If archiver cannot be reached

  • TimeoutError – If operation times out

  • ValueError – If time range or PV names are invalid

Return type:

pandas.DataFrame

abstractmethod async get_metadata(pv_name)[source]#

Get archiving metadata for a PV.

Parameters:

pv_name (str) – Name of the process variable

Returns:

ArchiverMetadata with archiving information

Raises:
  • ConnectionError – If archiver cannot be reached

  • ValueError – If PV name is invalid

Return type:

ArchiverMetadata

abstractmethod async check_availability(pv_names)[source]#

Check which PVs are archived.

Parameters:

pv_names (list[str]) – List of PV names to check

Returns:

Dictionary mapping PV name to availability status

Return type:

dict[str, bool]

Abstract base class defining the contract for all archiver connectors.

Container for historical time series data with timestamps and values.

Built-in Implementations#

class osprey.connectors.archiver.mock_archiver_connector.MockArchiverConnector[source]#

Bases: ArchiverConnector

Mock archiver for development - generates synthetic time-series data.

This connector simulates an archiver system without requiring real archiver access. It generates realistic time-series data for any PV name.

Features: - Accepts any PV names - Generates realistic time series with trends and noise - Configurable sampling rate and noise level - Returns pandas DataFrames matching real archiver format

Example

>>> config = {
>>>     'sample_rate_hz': 1.0,
>>>     'noise_level': 0.01
>>> }
>>> connector = MockArchiverConnector()
>>> await connector.connect(config)
>>> df = await connector.get_data(
>>>     pv_list=['BEAM:CURRENT'],
>>>     start_date=datetime(2024, 1, 1),
>>>     end_date=datetime(2024, 1, 2)
>>> )
__init__()[source]#
async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#

Generate synthetic historical data.

Parameters:
  • pv_list (list[str]) – List of PV names (all accepted)

  • start_date (datetime) – Start of time range

  • end_date (datetime) – End of time range

  • precision_ms (int) – Time precision (affects downsampling)

  • timeout (int | None) – Ignored for mock archiver

Returns:

DataFrame with datetime index and columns for each PV

Return type:

pandas.DataFrame

async get_metadata(pv_name)[source]#

Get mock archiver metadata.

Return type:

ArchiverMetadata

async check_availability(pv_names)[source]#

All PVs are available in mock archiver.

Return type:

dict[str, bool]

Development archiver that generates synthetic historical data. Ideal for R&D when you don’t have archiver access.

Key Features:

  • Generates realistic time series with trends and noise

  • Configurable retention period and sample rates

  • Works with any PV names

  • Consistent with mock control system connector

class osprey.connectors.archiver.epics_archiver_connector.EPICSArchiverConnector[source]#

Bases: ArchiverConnector

EPICS Archiver Appliance connector using archivertools.

Provides access to historical PV data from EPICS Archiver Appliance via the archivertools Python library.

Example

>>> config = {
>>>     'url': 'https://archiver.als.lbl.gov:8443',
>>>     'timeout': 60
>>> }
>>> connector = EPICSArchiverConnector()
>>> await connector.connect(config)
>>> df = await connector.get_data(
>>>     pv_list=['BEAM:CURRENT'],
>>>     start_date=datetime(2024, 1, 1),
>>>     end_date=datetime(2024, 1, 2)
>>> )
__init__()[source]#
async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#

Retrieve historical data from EPICS archiver.

Parameters:
  • pv_list (list[str]) – List of PV names to retrieve

  • start_date (datetime) – Start of time range

  • end_date (datetime) – End of time range

  • precision_ms (int) – Time precision in milliseconds (for downsampling)

  • timeout (int | None) – Optional timeout in seconds

Returns:

DataFrame with datetime index and PV columns

Raises:
  • RuntimeError – If archiver not connected

  • TimeoutError – If operation times out

  • ConnectionError – If archiver cannot be reached

  • ValueError – If data format is unexpected

Return type:

pandas.DataFrame

async get_metadata(pv_name)[source]#

Get archiving metadata for a PV.

Note: archivertools doesn’t expose metadata API directly, so this returns basic information.

Parameters:

pv_name (str) – Name of the process variable

Returns:

ArchiverMetadata with basic archiving information

Return type:

ArchiverMetadata

async check_availability(pv_names)[source]#

Check which PVs are archived.

Note: Basic implementation that assumes all PVs are archived. Could be enhanced with actual archiver API calls.

Parameters:

pv_names (list[str]) – List of PV names to check

Returns:

Dictionary mapping PV name to availability status

Return type:

dict[str, bool]

Production connector for EPICS Archiver Appliance using archivertools library.

Key Features:

  • EPICS Archiver Appliance integration

  • Efficient bulk data retrieval

  • Configurable precision and time ranges

  • Connection pooling for performance

Requirements:

  • archivertools library: pip install archivertools

  • Access to EPICS Archiver Appliance URL

Pattern Detection#

Static code analysis for detecting control system operations in generated code. Used by approval system to identify reads and writes.

Analyzes Python code using configurable regex patterns to detect control system operations. Returns detection results including operation types and matched patterns.

Example:

from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations

code = """
current = epics.caget('BEAM:CURRENT')
if current < 400:
    epics.caput('ALARM:STATUS', 1)
"""

result = detect_control_system_operations(code)
# result['has_writes'] == True
# result['has_reads'] == True

Configuration Schema#

Control System Configuration#

Control system connector configuration in config.yml:

control_system:
  type: mock | epics | labview | tango | custom

  # Pattern detection for approval system
  patterns:
    <control_system_type>:
      write:
        - '<regex_pattern_1>'
        - '<regex_pattern_2>'
      read:
        - '<regex_pattern_1>'
        - '<regex_pattern_2>'

  # Type-specific configurations
  connector:
    mock:
      response_delay_ms: 10
      noise_level: 0.01
      enable_writes: true

   epics:
     gateways:
       read_only:
         address: cagw.facility.edu
         port: 5064
         use_name_server: false    # Use EPICS_CA_NAME_SERVERS vs CA_ADDR_LIST (default: false)
       read_write:
         address: cagw-rw.facility.edu
         port: 5065
         use_name_server: false
     timeout: 5.0
     retry_count: 3
     retry_delay: 0.5

Archiver Configuration#

Archiver connector configuration in config.yml:

archiver:
  type: mock_archiver | epics_archiver | custom_archiver

  # Mock archiver uses sensible defaults
  mock_archiver:
    sample_rate_hz: 1.0
    noise_level: 0.01

  epics_archiver:
    url: https://archiver.facility.edu:8443
    timeout: 60
    max_retries: 3
    verify_ssl: true
    pool_connections: 10
    pool_maxsize: 20

Usage Examples#

Basic Usage#

Create and use a connector from global configuration:

from osprey.connectors.factory import ConnectorFactory

# Create connector from config.yml
connector = await ConnectorFactory.create_control_system_connector()

try:
    # Read a PV
    result = await connector.read_pv('BEAM:CURRENT')
    print(f"Current: {result.value} {result.metadata.units}")

    # Read multiple PVs
    results = await connector.read_multiple_pvs([
        'BEAM:CURRENT',
        'BEAM:LIFETIME',
        'BEAM:ENERGY'
    ])

    # Get metadata
    metadata = await connector.get_metadata('BEAM:CURRENT')
    print(f"Units: {metadata.units}, Range: {metadata.min_value}-{metadata.max_value}")

finally:
    await connector.disconnect()

Custom Configuration#

Create connector with inline configuration:

# Mock connector with custom settings
config = {
    'type': 'mock',
    'connector': {
        'mock': {
            'response_delay_ms': 5,
            'noise_level': 0.02
        }
    }
}
connector = await ConnectorFactory.create_control_system_connector(config)

# EPICS connector with specific gateway
config = {
    'type': 'epics',
    'connector': {
        'epics': {
            'gateways': {
                'read_only': {
                    'address': 'cagw.als.lbl.gov',
                    'port': 5064
                }
            },
            'timeout': 3.0
        }
    }
}
connector = await ConnectorFactory.create_control_system_connector(config)

Archiver Usage#

Retrieve historical data:

from osprey.connectors.factory import ConnectorFactory
from datetime import datetime, timedelta

# Create archiver connector
connector = await ConnectorFactory.create_archiver_connector()

try:
    # Define time range
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=24)

    # Retrieve data for multiple PVs
    data = await connector.get_data(
        pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'],
        start_date=start_time,
        end_date=end_time,
        precision_ms=1000  # 1 second precision
    )

    # Process results
    for pv_name, pv_data in data.items():
        print(f"{pv_name}: {len(pv_data.timestamps)} data points")

finally:
    await connector.disconnect()

Pattern Detection Usage#

Detect control system operations in generated code:

from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations

code = """
# Read beam current
current = epics.caget('BEAM:CURRENT')

# Adjust setpoint if needed
if current < 400:
    epics.caput('BEAM:SETPOINT', 420.0)
"""

# Detect operations
result = detect_control_system_operations(code)

if result['has_writes']:
    print("⚠️ Code performs write operations - requires approval")

if result['has_reads']:
    print("✓ Code performs read operations")

print(f"Control system: {result['control_system_type']}")
print(f"Write patterns detected: {result['detected_patterns']['writes']}")
print(f"Read patterns detected: {result['detected_patterns']['reads']}")

Custom Connector Registration#

Custom connectors are registered through the Osprey registry system:

# In your application's registry.py
from osprey.registry import ConnectorRegistration, extend_framework_registry

class MyAppRegistryProvider(RegistryConfigProvider):
    def get_registry_config(self):
        return extend_framework_registry(
            connectors=[
                # Control system connectors
                ConnectorRegistration(
                    name="labview",
                    connector_type="control_system",
                    module_path="my_app.connectors.labview_connector",
                    class_name="LabVIEWConnector",
                    description="LabVIEW Web Services connector"
                ),
                ConnectorRegistration(
                    name="tango",
                    connector_type="control_system",
                    module_path="my_app.connectors.tango_connector",
                    class_name="TangoConnector",
                    description="Tango control system connector"
                ),
                # Archiver connectors
                ConnectorRegistration(
                    name="tango_archiver",
                    connector_type="archiver",
                    module_path="my_app.connectors.tango_archiver",
                    class_name="TangoArchiverConnector",
                    description="Tango archiver connector"
                ),
            ],
            capabilities=[...],
            context_classes=[...]
        )

After registration, connectors are available via configuration:

# config.yml
control_system:
  type: labview  # or tango, epics, mock
  connector:
    labview:
      base_url: "http://labview-server:8080"
      api_key: "your-api-key"
    tango:
      device_name: "tango://host:10000/sys/dev/1"

archiver:
  type: tango_archiver
  tango_archiver:
    url: "https://archiver.facility.edu"

See also

Control System Integration

Complete implementation guide with step-by-step examples

Part 1: Getting Started

See connectors in action in the Control Assistant tutorial

Human Approval

How pattern detection integrates with approval workflows

Python Execution

Pattern detection in secure Python code execution