Source code for osprey.connectors.archiver.epics_archiver_connector
"""
EPICS Archiver Appliance connector using archivertools.
Provides interface to EPICS Archiver Appliance for historical data retrieval.
Refactored from existing archiver integration code.
Related to Issue #18 - Control System Abstraction (Layer 2 - EPICS Archiver Implementation)
"""
import asyncio
from datetime import datetime
from typing import Any
import pandas as pd
from osprey.connectors.archiver.base import ArchiverConnector, ArchiverMetadata
from osprey.utils.logger import get_logger
logger = get_logger("epics_archiver_connector")
[docs]
class EPICSArchiverConnector(ArchiverConnector):
"""
EPICS Archiver Appliance connector using archivertools.
Provides access to historical PV data from EPICS Archiver Appliance
via the archivertools Python library.
Example:
>>> config = {
>>> 'url': 'https://archiver.als.lbl.gov:8443',
>>> 'timeout': 60
>>> }
>>> connector = EPICSArchiverConnector()
>>> await connector.connect(config)
>>> df = await connector.get_data(
>>> pv_list=['BEAM:CURRENT'],
>>> start_date=datetime(2024, 1, 1),
>>> end_date=datetime(2024, 1, 2)
>>> )
"""
[docs]
def __init__(self):
self._connected = False
self._archiver_client = None
async def connect(self, config: dict[str, Any]) -> None:
"""
Initialize archiver client.
Args:
config: Configuration with keys:
- url: Archiver URL (required)
- timeout: Default timeout in seconds (default: 60)
Raises:
ImportError: If archivertools is not installed
ValueError: If URL is not provided
"""
try:
from archivertools import ArchiverClient
self._ArchiverClient = ArchiverClient
except ImportError as e:
raise ImportError(
"archivertools is required for EPICS archiver. "
"Install with: pip install archivertools"
) from e
archiver_url = config.get('url')
if not archiver_url:
raise ValueError("archiver URL is required for EPICS archiver")
try:
self._archiver_client = self._ArchiverClient(archiver_url=archiver_url)
except Exception as e:
raise ConnectionError(f"ArchiverClient initialization failed: {e}") from e
self._timeout = config.get('timeout', 60)
self._connected = True
logger.debug(f"EPICS Archiver connector initialized: {archiver_url}")
async def disconnect(self) -> None:
"""Cleanup archiver connection."""
self._archiver_client = None
self._connected = False
logger.debug("EPICS Archiver connector disconnected")
[docs]
async def get_data(
self,
pv_list: list[str],
start_date: datetime,
end_date: datetime,
precision_ms: int = 1000,
timeout: int | None = None
) -> pd.DataFrame:
"""
Retrieve historical data from EPICS archiver.
Args:
pv_list: List of PV names to retrieve
start_date: Start of time range
end_date: End of time range
precision_ms: Time precision in milliseconds (for downsampling)
timeout: Optional timeout in seconds
Returns:
DataFrame with datetime index and PV columns
Raises:
RuntimeError: If archiver not connected
TimeoutError: If operation times out
ConnectionError: If archiver cannot be reached
ValueError: If data format is unexpected
"""
timeout = timeout or self._timeout
if not self._archiver_client:
raise RuntimeError("Archiver not connected")
# Validate inputs
if not isinstance(start_date, datetime):
raise TypeError(f"start_date must be a datetime object, got {type(start_date)}")
if not isinstance(end_date, datetime):
raise TypeError(f"end_date must be a datetime object, got {type(end_date)}")
def fetch_data():
"""Synchronous data fetch function."""
return self._archiver_client.match_data(
pv_list=pv_list,
precision=precision_ms,
start=start_date,
end=end_date,
verbose=0
)
try:
# Use asyncio.wait_for for timeout, asyncio.to_thread for async execution
data = await asyncio.wait_for(
asyncio.to_thread(fetch_data),
timeout=timeout
)
# Ensure datetime index
if isinstance(data, pd.DataFrame) and hasattr(data, 'index'):
data.index = pd.to_datetime(data.index)
logger.debug(
f"Retrieved archiver data: {len(data)} points "
f"for {len(pv_list)} PVs"
)
return data
else:
raise ValueError(f"Unexpected data format: {type(data)}")
except TimeoutError as e:
raise TimeoutError(f"Archiver request timed out after {timeout}s") from e
except ConnectionRefusedError as e:
raise ConnectionError(
"Cannot connect to the archiver. "
"Please check connectivity and SSH tunnels (if required)."
) from e
except Exception as e:
error_msg = str(e).lower()
if "connection" in error_msg:
raise ConnectionError(
f"Network connectivity issue with archiver: {e}"
) from e
raise
[docs]
async def check_availability(self, pv_names: list[str]) -> dict[str, bool]:
"""
Check which PVs are archived.
Note: Basic implementation that assumes all PVs are archived.
Could be enhanced with actual archiver API calls.
Args:
pv_names: List of PV names to check
Returns:
Dictionary mapping PV name to availability status
"""
# Basic implementation - could be enhanced with archiver API calls
return dict.fromkeys(pv_names, True)