Control System Connectors#
Pluggable connector abstraction for control systems and archivers with mock and production implementations. Enables development without hardware access and seamless migration to production by changing configuration.
Note
For implementation guides and examples, see Control System Integration.
Factory Classes#
- class osprey.connectors.factory.ConnectorFactory[source]#
Bases:
objectFactory for creating control system and archiver connectors.
Provides centralized management of available connectors. Connectors are registered automatically through the Osprey registry system during framework initialization.
Example
>>> # Using default config >>> cs_connector = await ConnectorFactory.create_control_system_connector() >>> >>> # Using custom config >>> config = {'type': 'mock', 'connector': {'mock': {...}}} >>> cs_connector = await ConnectorFactory.create_control_system_connector(config)
- classmethod register_control_system(name, connector_class)[source]#
Register a control system connector.
- Parameters:
name (str) – Unique name for the connector (e.g., ‘epics’, ‘tango’, ‘mock’)
connector_class (type[ControlSystemConnector]) – Connector class implementing ControlSystemConnector
- classmethod register_archiver(name, connector_class)[source]#
Register an archiver connector.
- Parameters:
name (str) – Unique name for the connector (e.g., ‘epics_archiver’, ‘mock_archiver’)
connector_class (type[ArchiverConnector]) – Connector class implementing ArchiverConnector
- async classmethod create_control_system_connector(config=None)[source]#
Create and configure a control system connector.
- Parameters:
config (dict[str, Any]) – Control system configuration dict with keys: - type: Connector type (e.g., ‘epics’, ‘mock’) - connector: Dict with connector-specific configs If None, loads from global config
- Returns:
Initialized and connected ControlSystemConnector
- Raises:
ValueError – If connector type is unknown or config is invalid
ConnectionError – If connection fails
- Return type:
Example
>>> config = { >>> 'type': 'epics', >>> 'connector': { >>> 'epics': { >>> 'timeout': 5.0, >>> 'gateways': {'read_only': {...}} >>> } >>> } >>> } >>> connector = await ConnectorFactory.create_control_system_connector(config)
- async classmethod create_archiver_connector(config=None)[source]#
Create and configure an archiver connector.
- Parameters:
config (dict[str, Any]) – Archiver configuration dict with keys: - type: Connector type (e.g., ‘epics_archiver’, ‘mock_archiver’) - [type]: Dict with type-specific configs If None, loads from global config
- Returns:
Initialized and connected ArchiverConnector
- Raises:
ValueError – If connector type is unknown or config is invalid
ConnectionError – If connection fails
- Return type:
Example
>>> config = { >>> 'type': 'epics_archiver', >>> 'epics_archiver': { >>> 'url': 'https://archiver.als.lbl.gov:8443', >>> 'timeout': 60 >>> } >>> } >>> connector = await ConnectorFactory.create_archiver_connector(config)
The factory provides centralized creation and configuration of connectors with plugin-style registration.
Registry Integration#
Connectors can be registered through the Osprey registry system for unified component management.
- class osprey.registry.ConnectorRegistration(name, connector_type, module_path, class_name, description)[source]#
Bases:
objectRegistration metadata for control system and archiver connectors.
Defines the metadata required for lazy loading of connector classes. Connectors are registered with the ConnectorFactory during registry initialization, providing unified management of all framework components.
- Parameters:
name (str) – Unique connector name (e.g., ‘epics’, ‘tango’, ‘mock’)
connector_type (str) – Type of connector (‘control_system’ or ‘archiver’)
module_path (str) – Python module path for lazy import
class_name (str) – Connector class name within the module
description (str) – Human-readable description
Examples
Control system connector registration:
>>> ConnectorRegistration( ... name="labview", ... connector_type="control_system", ... module_path="my_app.connectors.labview_connector", ... class_name="LabVIEWConnector", ... description="LabVIEW Web Services connector for NI systems" ... )
Archiver connector registration:
>>> ConnectorRegistration( ... name="custom_archiver", ... connector_type="archiver", ... module_path="my_app.connectors.custom_archiver", ... class_name="CustomArchiverConnector", ... description="Custom facility archiver connector" ... )
Note
The connector classes are registered with ConnectorFactory during registry initialization, enabling lazy loading while maintaining the factory pattern for runtime connector creation.
See also
osprey.connectors.factory.ConnectorFactory: Runtime connector factoryosprey.connectors.control_system.base.ControlSystemConnector: Base class for control system connectorsosprey.connectors.archiver.base.ArchiverConnector: Base class for archiver connectors- name: str#
- connector_type: str#
- module_path: str#
- class_name: str#
- description: str#
- __init__(name, connector_type, module_path, class_name, description)#
Registration dataclass for control system and archiver connectors. Used to register connectors through the registry system, providing lazy loading and unified management alongside other framework components.
Usage Example:
from osprey.registry import ConnectorRegistration, extend_framework_registry
def get_registry_config(self):
return extend_framework_registry(
connectors=[
ConnectorRegistration(
name="labview",
connector_type="control_system",
module_path="my_app.connectors.labview_connector",
class_name="LabVIEWConnector",
description="LabVIEW Web Services connector for NI systems"
),
ConnectorRegistration(
name="tango",
connector_type="control_system",
module_path="my_app.connectors.tango_connector",
class_name="TangoConnector",
description="Tango control system connector"
),
ConnectorRegistration(
name="tango_archiver",
connector_type="archiver",
module_path="my_app.connectors.tango_archiver",
class_name="TangoArchiverConnector",
description="Tango archiver connector"
)
],
capabilities=[...],
context_classes=[...]
)
Control System Interfaces#
Base Classes#
- class osprey.connectors.control_system.base.ControlSystemConnector[source]#
Bases:
ABCAbstract base class for control system connectors.
Implementations provide interfaces to different control systems (EPICS, LabVIEW, Tango, Mock, etc.) using a unified API.
Example
>>> connector = await ConnectorFactory.create_control_system_connector() >>> try: >>> pv_value = await connector.read_pv('BEAM:CURRENT') >>> print(f"Beam current: {pv_value.value} {pv_value.metadata.units}") >>> finally: >>> await connector.disconnect()
- abstractmethod async connect(config)[source]#
Establish connection to control system.
- Parameters:
config (dict[str, Any]) – Control system-specific configuration
- Raises:
ConnectionError – If connection cannot be established
- abstractmethod async disconnect()[source]#
Close connection to control system and cleanup resources.
- abstractmethod async read_pv(pv_address, timeout=None)[source]#
Read current value of a process variable.
- Parameters:
pv_address (str) – Address/name of the process variable
timeout (float | None) – Optional timeout in seconds
- Returns:
PVValue with current value, timestamp, and metadata
- Raises:
ConnectionError – If PV cannot be reached
TimeoutError – If operation times out
ValueError – If PV address is invalid
- Return type:
- abstractmethod async write_pv(pv_address, value, timeout=None)[source]#
Write value to a process variable.
- Parameters:
pv_address (str) – Address/name of the process variable
value (Any) – Value to write
timeout (float | None) – Optional timeout in seconds
- Returns:
True if write was successful
- Raises:
ConnectionError – If PV cannot be reached
TimeoutError – If operation times out
ValueError – If value is invalid for this PV
PermissionError – If write access is not allowed
- Return type:
bool
- abstractmethod async read_multiple_pvs(pv_addresses, timeout=None)[source]#
Read multiple PVs efficiently (can be optimized per control system).
- Parameters:
pv_addresses (list[str]) – List of PV addresses to read
timeout (float | None) – Optional timeout in seconds
- Returns:
Dictionary mapping PV address to PVValue (May exclude PVs that failed to read)
- Return type:
dict[str, PVValue]
- abstractmethod async subscribe(pv_address, callback)[source]#
Subscribe to PV changes.
- Parameters:
pv_address (str) – Address/name of the process variable
callback (Callable[[PVValue], None]) – Function to call when PV value changes
- Returns:
Subscription ID for later unsubscription
- Raises:
ConnectionError – If PV cannot be reached
- Return type:
str
- abstractmethod async unsubscribe(subscription_id)[source]#
Unsubscribe from PV changes.
- Parameters:
subscription_id (str) – Subscription ID returned by subscribe()
Abstract base class defining the contract for all control system connectors (EPICS, LabVIEW, Tango, Mock, etc.).
Data Models#
- class osprey.connectors.control_system.base.PVValue(value, timestamp, metadata=<factory>)[source]#
Bases:
objectValue of a process variable with metadata.
- value: Any#
- timestamp: datetime#
- metadata: PVMetadata#
- __init__(value, timestamp, metadata=<factory>)#
Result container for process variable reads with value, timestamp, and metadata.
- class osprey.connectors.control_system.base.PVMetadata(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)[source]#
Bases:
objectMetadata about a process variable/attribute.
- units: str = ''#
- precision: int | None = None#
- alarm_status: str | None = None#
- timestamp: datetime | None = None#
- description: str | None = None#
- min_value: float | None = None#
- max_value: float | None = None#
- raw_metadata: dict[str, Any] | None#
- __init__(units='', precision=None, alarm_status=None, timestamp=None, description=None, min_value=None, max_value=None, raw_metadata=<factory>)#
Metadata about a process variable (units, precision, alarms, limits, etc.).
Built-in Implementations#
- class osprey.connectors.control_system.mock_connector.MockConnector[source]#
Bases:
ControlSystemConnectorMock control system connector for development and testing.
This connector simulates a control system without requiring real hardware. It generates realistic synthetic data for any PV name, making it ideal for R&D and development when you don’t have access to the control room.
Features: - Accepts any PV name - Generates realistic initial values based on PV naming conventions - Adds configurable noise to simulate real measurements - Maintains state between reads and writes - Simulates readback PVs (e.g., :SP -> :RB)
Example
>>> config = { >>> 'response_delay_ms': 10, >>> 'noise_level': 0.01, >>> 'enable_writes': True >>> } >>> connector = MockConnector() >>> await connector.connect(config) >>> value = await connector.read_pv('BEAM:CURRENT') >>> print(f"Beam current: {value.value} {value.metadata.units}")
- async read_pv(pv_address, timeout=None)[source]#
Read PV - generates realistic value if not cached.
- Parameters:
pv_address (str) – Any PV name (mock accepts all names)
timeout (float | None) – Ignored for mock connector
- Returns:
PVValue with synthetic data
- Return type:
- async write_pv(pv_address, value, timeout=None)[source]#
Write PV - updates internal state.
- Parameters:
pv_address (str) – Any PV name
value (Any) – Value to write
timeout (float | None) – Ignored for mock connector
- Returns:
True if successful, False if writes disabled
- Return type:
bool
- async read_multiple_pvs(pv_addresses, timeout=None)[source]#
Read multiple PVs concurrently.
- Return type:
dict[str, PVValue]
- async subscribe(pv_address, callback)[source]#
Subscribe to PV changes.
Note: Mock connector only triggers callbacks on write_pv calls.
- Return type:
str
Development connector that accepts any PV names and generates realistic simulated data. Ideal for R&D when you don’t have control room access.
Key Features:
Accepts any PV name (no real control system required)
Configurable response delays and noise levels
Realistic units, timestamps, and metadata
Optional write operation support
- class osprey.connectors.control_system.epics_connector.EPICSConnector[source]#
Bases:
ControlSystemConnectorEPICS control system connector using pyepics.
Provides read/write access to EPICS Process Variables through Channel Access protocol. Supports gateway configuration for remote access and read-only/write-access gateways.
Example
Direct gateway connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘cagw-alsdmz.als.lbl.gov’, >>> ‘port’: 5064 >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)
SSH tunnel connection: >>> config = { >>> ‘timeout’: 5.0, >>> ‘gateways’: { >>> ‘read_only’: { >>> ‘address’: ‘localhost’, >>> ‘port’: 5074, >>> ‘use_name_server’: True >>> } >>> } >>> } >>> connector = EPICSConnector() >>> await connector.connect(config) >>> value = await connector.read_pv(‘BEAM:CURRENT’) >>> print(f”Beam current: {value.value} {value.metadata.units}”)
- async read_pv(pv_address, timeout=None)[source]#
Read current value from EPICS PV.
- Parameters:
pv_address (str) – EPICS PV address (e.g., ‘BEAM:CURRENT’)
timeout (float | None) – Timeout in seconds (uses default if None)
- Returns:
PVValue with current value, timestamp, and metadata
- Raises:
ConnectionError – If PV cannot be connected
TimeoutError – If operation times out
- Return type:
- async write_pv(pv_address, value, timeout=None)[source]#
Write value to EPICS PV.
- Parameters:
pv_address (str) – EPICS PV address
value (Any) – Value to write
timeout (float | None) – Timeout in seconds
- Returns:
True if write was successful
- Raises:
ConnectionError – If PV cannot be connected
TimeoutError – If operation times out
- Return type:
bool
- async read_multiple_pvs(pv_addresses, timeout=None)[source]#
Read multiple PVs concurrently.
- Return type:
dict[str, PVValue]
- async subscribe(pv_address, callback)[source]#
Subscribe to PV value changes.
- Parameters:
pv_address (str) – EPICS PV address
callback (Callable[[PVValue], None]) – Function to call when value changes
- Returns:
Subscription ID for later unsubscription
- Return type:
str
Production EPICS Channel Access connector using pyepics library. Supports gateway configuration for secure access.
Key Features:
EPICS Channel Access protocol
Read-only and read-write gateway support
Configurable timeouts and retry logic
Full metadata support (units, precision, alarms, limits)
Requirements:
pyepicslibrary:pip install pyepicsAccess to EPICS gateway or IOCs
Archiver Interfaces#
Base Classes#
- class osprey.connectors.archiver.base.ArchiverConnector[source]#
Bases:
ABCAbstract base class for archiver connectors.
Implementations provide interfaces to different archiver systems using a unified API that returns pandas DataFrames.
Example
>>> connector = await ConnectorFactory.create_archiver_connector() >>> try: >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> ) >>> print(df.head()) >>> finally: >>> await connector.disconnect()
- abstractmethod async connect(config)[source]#
Establish connection to archiver.
- Parameters:
config (dict[str, Any]) – Archiver-specific configuration
- Raises:
ConnectionError – If connection cannot be established
- abstractmethod async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Retrieve historical data for PVs.
- Parameters:
pv_list (list[str]) – List of PV names to retrieve
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision in milliseconds (for downsampling)
timeout (int | None) – Optional timeout in seconds
- Returns:
DataFrame with datetime index and PV columns Each column contains the time series for one PV
- Raises:
ConnectionError – If archiver cannot be reached
TimeoutError – If operation times out
ValueError – If time range or PV names are invalid
- Return type:
pandas.DataFrame
- abstractmethod async get_metadata(pv_name)[source]#
Get archiving metadata for a PV.
- Parameters:
pv_name (str) – Name of the process variable
- Returns:
ArchiverMetadata with archiving information
- Raises:
ConnectionError – If archiver cannot be reached
ValueError – If PV name is invalid
- Return type:
ArchiverMetadata
Abstract base class defining the contract for all archiver connectors.
Container for historical time series data with timestamps and values.
Built-in Implementations#
- class osprey.connectors.archiver.mock_archiver_connector.MockArchiverConnector[source]#
Bases:
ArchiverConnectorMock archiver for development - generates synthetic time-series data.
This connector simulates an archiver system without requiring real archiver access. It generates realistic time-series data for any PV name.
Features: - Accepts any PV names - Generates realistic time series with trends and noise - Configurable sampling rate and noise level - Returns pandas DataFrames matching real archiver format
Example
>>> config = { >>> 'sample_rate_hz': 1.0, >>> 'noise_level': 0.01 >>> } >>> connector = MockArchiverConnector() >>> await connector.connect(config) >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> )
- async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Generate synthetic historical data.
- Parameters:
pv_list (list[str]) – List of PV names (all accepted)
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision (affects downsampling)
timeout (int | None) – Ignored for mock archiver
- Returns:
DataFrame with datetime index and columns for each PV
- Return type:
pandas.DataFrame
Development archiver that generates synthetic historical data. Ideal for R&D when you don’t have archiver access.
Key Features:
Generates realistic time series with trends and noise
Configurable retention period and sample rates
Works with any PV names
Consistent with mock control system connector
- class osprey.connectors.archiver.epics_archiver_connector.EPICSArchiverConnector[source]#
Bases:
ArchiverConnectorEPICS Archiver Appliance connector using archivertools.
Provides access to historical PV data from EPICS Archiver Appliance via the archivertools Python library.
Example
>>> config = { >>> 'url': 'https://archiver.als.lbl.gov:8443', >>> 'timeout': 60 >>> } >>> connector = EPICSArchiverConnector() >>> await connector.connect(config) >>> df = await connector.get_data( >>> pv_list=['BEAM:CURRENT'], >>> start_date=datetime(2024, 1, 1), >>> end_date=datetime(2024, 1, 2) >>> )
- async get_data(pv_list, start_date, end_date, precision_ms=1000, timeout=None)[source]#
Retrieve historical data from EPICS archiver.
- Parameters:
pv_list (list[str]) – List of PV names to retrieve
start_date (datetime) – Start of time range
end_date (datetime) – End of time range
precision_ms (int) – Time precision in milliseconds (for downsampling)
timeout (int | None) – Optional timeout in seconds
- Returns:
DataFrame with datetime index and PV columns
- Raises:
RuntimeError – If archiver not connected
TimeoutError – If operation times out
ConnectionError – If archiver cannot be reached
ValueError – If data format is unexpected
- Return type:
pandas.DataFrame
- async get_metadata(pv_name)[source]#
Get archiving metadata for a PV.
Note: archivertools doesn’t expose metadata API directly, so this returns basic information.
- Parameters:
pv_name (str) – Name of the process variable
- Returns:
ArchiverMetadata with basic archiving information
- Return type:
ArchiverMetadata
- async check_availability(pv_names)[source]#
Check which PVs are archived.
Note: Basic implementation that assumes all PVs are archived. Could be enhanced with actual archiver API calls.
- Parameters:
pv_names (list[str]) – List of PV names to check
- Returns:
Dictionary mapping PV name to availability status
- Return type:
dict[str, bool]
Production connector for EPICS Archiver Appliance using archivertools library.
Key Features:
EPICS Archiver Appliance integration
Efficient bulk data retrieval
Configurable precision and time ranges
Connection pooling for performance
Requirements:
archivertoolslibrary:pip install archivertoolsAccess to EPICS Archiver Appliance URL
Pattern Detection#
Static code analysis for detecting control system operations in generated code. Used by approval system to identify reads and writes.
Analyzes Python code using configurable regex patterns to detect control system operations. Returns detection results including operation types and matched patterns.
Example:
from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations
code = """
current = epics.caget('BEAM:CURRENT')
if current < 400:
epics.caput('ALARM:STATUS', 1)
"""
result = detect_control_system_operations(code)
# result['has_writes'] == True
# result['has_reads'] == True
Configuration Schema#
Control System Configuration#
Control system connector configuration in config.yml:
control_system:
type: mock | epics | labview | tango | custom
# Pattern detection for approval system
patterns:
<control_system_type>:
write:
- '<regex_pattern_1>'
- '<regex_pattern_2>'
read:
- '<regex_pattern_1>'
- '<regex_pattern_2>'
# Type-specific configurations
connector:
mock:
response_delay_ms: 10
noise_level: 0.01
enable_writes: true
epics:
gateways:
read_only:
address: cagw.facility.edu
port: 5064
use_name_server: false # Use EPICS_CA_NAME_SERVERS vs CA_ADDR_LIST (default: false)
read_write:
address: cagw-rw.facility.edu
port: 5065
use_name_server: false
timeout: 5.0
retry_count: 3
retry_delay: 0.5
Archiver Configuration#
Archiver connector configuration in config.yml:
archiver:
type: mock_archiver | epics_archiver | custom_archiver
# Mock archiver uses sensible defaults
mock_archiver:
sample_rate_hz: 1.0
noise_level: 0.01
epics_archiver:
url: https://archiver.facility.edu:8443
timeout: 60
max_retries: 3
verify_ssl: true
pool_connections: 10
pool_maxsize: 20
Usage Examples#
Basic Usage#
Create and use a connector from global configuration:
from osprey.connectors.factory import ConnectorFactory
# Create connector from config.yml
connector = await ConnectorFactory.create_control_system_connector()
try:
# Read a PV
result = await connector.read_pv('BEAM:CURRENT')
print(f"Current: {result.value} {result.metadata.units}")
# Read multiple PVs
results = await connector.read_multiple_pvs([
'BEAM:CURRENT',
'BEAM:LIFETIME',
'BEAM:ENERGY'
])
# Get metadata
metadata = await connector.get_metadata('BEAM:CURRENT')
print(f"Units: {metadata.units}, Range: {metadata.min_value}-{metadata.max_value}")
finally:
await connector.disconnect()
Custom Configuration#
Create connector with inline configuration:
# Mock connector with custom settings
config = {
'type': 'mock',
'connector': {
'mock': {
'response_delay_ms': 5,
'noise_level': 0.02
}
}
}
connector = await ConnectorFactory.create_control_system_connector(config)
# EPICS connector with specific gateway
config = {
'type': 'epics',
'connector': {
'epics': {
'gateways': {
'read_only': {
'address': 'cagw.als.lbl.gov',
'port': 5064
}
},
'timeout': 3.0
}
}
}
connector = await ConnectorFactory.create_control_system_connector(config)
Archiver Usage#
Retrieve historical data:
from osprey.connectors.factory import ConnectorFactory
from datetime import datetime, timedelta
# Create archiver connector
connector = await ConnectorFactory.create_archiver_connector()
try:
# Define time range
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
# Retrieve data for multiple PVs
data = await connector.get_data(
pv_list=['BEAM:CURRENT', 'BEAM:LIFETIME'],
start_date=start_time,
end_date=end_time,
precision_ms=1000 # 1 second precision
)
# Process results
for pv_name, pv_data in data.items():
print(f"{pv_name}: {len(pv_data.timestamps)} data points")
finally:
await connector.disconnect()
Pattern Detection Usage#
Detect control system operations in generated code:
from osprey.services.python_executor.analysis.pattern_detection import detect_control_system_operations
code = """
# Read beam current
current = epics.caget('BEAM:CURRENT')
# Adjust setpoint if needed
if current < 400:
epics.caput('BEAM:SETPOINT', 420.0)
"""
# Detect operations
result = detect_control_system_operations(code)
if result['has_writes']:
print("⚠️ Code performs write operations - requires approval")
if result['has_reads']:
print("✓ Code performs read operations")
print(f"Control system: {result['control_system_type']}")
print(f"Write patterns detected: {result['detected_patterns']['writes']}")
print(f"Read patterns detected: {result['detected_patterns']['reads']}")
Custom Connector Registration#
Custom connectors are registered through the Osprey registry system:
# In your application's registry.py
from osprey.registry import ConnectorRegistration, extend_framework_registry
class MyAppRegistryProvider(RegistryConfigProvider):
def get_registry_config(self):
return extend_framework_registry(
connectors=[
# Control system connectors
ConnectorRegistration(
name="labview",
connector_type="control_system",
module_path="my_app.connectors.labview_connector",
class_name="LabVIEWConnector",
description="LabVIEW Web Services connector"
),
ConnectorRegistration(
name="tango",
connector_type="control_system",
module_path="my_app.connectors.tango_connector",
class_name="TangoConnector",
description="Tango control system connector"
),
# Archiver connectors
ConnectorRegistration(
name="tango_archiver",
connector_type="archiver",
module_path="my_app.connectors.tango_archiver",
class_name="TangoArchiverConnector",
description="Tango archiver connector"
),
],
capabilities=[...],
context_classes=[...]
)
After registration, connectors are available via configuration:
# config.yml
control_system:
type: labview # or tango, epics, mock
connector:
labview:
base_url: "http://labview-server:8080"
api_key: "your-api-key"
tango:
device_name: "tango://host:10000/sys/dev/1"
archiver:
type: tango_archiver
tango_archiver:
url: "https://archiver.facility.edu"
See also
- Control System Integration
Complete implementation guide with step-by-step examples
- Part 1: Getting Started
See connectors in action in the Control Assistant tutorial
- Human Approval
How pattern detection integrates with approval workflows
- Python Execution
Pattern detection in secure Python code execution