Source code for osprey.connectors.archiver.mock_archiver_connector
"""
Mock archiver connector for development and testing.
Generates synthetic time-series data for any PV names.
Ideal for R&D and development without archiver access.
Related to Issue #18 - Control System Abstraction (Layer 2 - Mock Implementation)
"""
from datetime import datetime, timedelta
from typing import Any
import numpy as np
import pandas as pd
from osprey.connectors.archiver.base import ArchiverConnector, ArchiverMetadata
from osprey.utils.logger import get_logger
logger = get_logger("mock_archiver_connector")
[docs]
class MockArchiverConnector(ArchiverConnector):
"""
Mock archiver for development - generates synthetic time-series data.
This connector simulates an archiver system without requiring real
archiver access. It generates realistic time-series data for any PV name.
Features:
- Accepts any PV names
- Generates realistic time series with trends and noise
- Configurable sampling rate and noise level
- Returns pandas DataFrames matching real archiver format
Example:
>>> config = {
>>> 'sample_rate_hz': 1.0,
>>> 'noise_level': 0.01
>>> }
>>> connector = MockArchiverConnector()
>>> await connector.connect(config)
>>> df = await connector.get_data(
>>> pv_list=['BEAM:CURRENT'],
>>> start_date=datetime(2024, 1, 1),
>>> end_date=datetime(2024, 1, 2)
>>> )
"""
[docs]
def __init__(self):
self._connected = False
async def connect(self, config: dict[str, Any]) -> None:
"""
Initialize mock archiver.
Args:
config: Configuration with keys:
- sample_rate_hz: Sampling rate (default: 1.0)
- noise_level: Relative noise level (default: 0.1)
"""
self._sample_rate_hz = config.get('sample_rate_hz', 1.0)
self._noise_level = config.get('noise_level', 0.1)
self._connected = True
logger.debug("Mock archiver connector initialized")
async def disconnect(self) -> None:
"""Cleanup mock archiver."""
self._connected = False
logger.debug("Mock archiver connector disconnected")
[docs]
async def get_data(
self,
pv_list: list[str],
start_date: datetime,
end_date: datetime,
precision_ms: int = 1000,
timeout: int | None = None
) -> pd.DataFrame:
"""
Generate synthetic historical data.
Args:
pv_list: List of PV names (all accepted)
start_date: Start of time range
end_date: End of time range
precision_ms: Time precision (affects downsampling)
timeout: Ignored for mock archiver
Returns:
DataFrame with datetime index and columns for each PV
"""
duration = (end_date - start_date).total_seconds()
# Limit number of points for performance
# Use precision_ms to determine sampling
num_points = min(int(duration / (precision_ms / 1000.0)), 10000)
num_points = max(num_points, 10) # At least 10 points
# Generate timestamps
time_step = duration / (num_points - 1) if num_points > 1 else 0
timestamps = [
start_date + timedelta(seconds=i * time_step)
for i in range(num_points)
]
# Generate data for each PV
data = {}
for pv in pv_list:
data[pv] = self._generate_time_series(pv, num_points)
df = pd.DataFrame(data, index=pd.to_datetime(timestamps))
logger.debug(
f"Mock archiver generated {len(df)} points for "
f"{len(pv_list)} PVs from {start_date} to {end_date}"
)
return df
[docs]
async def check_availability(self, pv_names: list[str]) -> dict[str, bool]:
"""All PVs are available in mock archiver."""
return dict.fromkeys(pv_names, True)
def _generate_time_series(self, pv_name: str, num_points: int) -> np.ndarray:
"""
Generate synthetic time series with trends and noise.
Creates realistic-looking data with:
- Sinusoidal variations
- Linear trends
- Random noise
- PV-type-specific characteristics
- BPMs use random offsets with slow oscillations
"""
t = np.linspace(0, 1, num_points)
pv_lower = pv_name.lower()
# Check if this is a BPM - use new approach
if 'position' in pv_lower or 'pos' in pv_lower or 'bpm' in pv_lower:
# Use PV name as seed for reproducibility
rng = np.random.default_rng(seed=hash(pv_name) % (2**32))
# BPM positions in mm (±100 µm = ±0.1 mm range)
base = 0.0
offset_range = 0.1 # ±100 µm equilibrium position
perturbation_amp = 0.01 # ±10 µm oscillation
trend = np.ones(num_points) * base
# Random offset for this BPM (equilibrium position)
offset = rng.uniform(-offset_range, offset_range)
# Random phase and frequency for perturbation
phase = rng.uniform(0, 2 * np.pi)
# Very low frequencies: 0.01 to 0.5 cycles over time range
# Appears as slow drift / quasi-linear behavior
frequency = rng.uniform(0.01, 0.5)
# Sinusoidal perturbation with random phase
wave = perturbation_amp * np.sin(2 * np.pi * t * frequency + phase)
# Add random noise
noise_amplitude = perturbation_amp * self._noise_level
noise = rng.normal(0, noise_amplitude, num_points)
return trend + offset + wave + noise
# Original behavior for all other PV types
if ('beam' in pv_lower and 'current' in pv_lower) or 'dcct' in pv_lower:
# Beam current: slow decay with refills
base = 500.0
# Simulate refills 10 times over the time range
trend = np.ones(num_points) * base
for i in range(num_points):
# Decay between refills (5% loss per cycle)
decay_phase = i % (num_points // 10)
trend[i] = base * (1 - 0.05 * (decay_phase / (num_points // 10)))
wave = 5 * np.sin(2 * np.pi * t * 5)
elif 'current' in pv_lower:
base = 150.0
trend = base + 10 * t
wave = 10 * np.sin(2 * np.pi * t * 3)
elif 'voltage' in pv_lower:
base = 5000.0
trend = np.ones(num_points) * base
wave = 50 * np.sin(2 * np.pi * t * 2)
elif 'power' in pv_lower:
base = 50.0
trend = base + 5 * t
wave = 5 * np.sin(2 * np.pi * t * 4)
elif 'pressure' in pv_lower:
base = 1e-9
trend = base * (1 + 0.1 * t)
wave = base * 0.05 * np.sin(2 * np.pi * t * 10)
elif 'temp' in pv_lower:
base = 25.0
trend = base + 2 * t
wave = 0.5 * np.sin(2 * np.pi * t * 8)
elif 'lifetime' in pv_lower:
base = 10.0
trend = base - 2 * t # Lifetime decreases with current
wave = 1 * np.sin(2 * np.pi * t * 3)
else:
base = 100.0
trend = base + 20 * t
wave = 10 * np.sin(2 * np.pi * t * 2)
# Add random noise
noise_amplitude = abs(base) * self._noise_level
noise = np.random.normal(0, noise_amplitude, num_points)
return trend + wave + noise