Skip to content

Data Helpers

The pacsys.exp module provides high-level utilities for common accelerator physics workflows: monitoring channels, scanning parameters, logging data to files, and waiting for conditions.

from pacsys.exp import Monitor, read_fresh, watch, scan, DataLogger
from pacsys.exp import CsvWriter, ParquetWriter

All functions accept DRF strings or Device objects and use the global default backend unless one is explicitly provided.


Monitor

Subscribe to one or more channels and collect readings into ring buffers. Supports both blocking (collect) and non-blocking (start/stop) modes.

Blocking Collection

from pacsys.exp import Monitor

mon = Monitor(["M:OUTTMP@p,1000", "G:AMANDA@e,8f"])
result = mon.collect(duration=10.0)

print(result.mean("M:OUTTMP@p,1000"))    # Mean value
print(result.std("M:OUTTMP@p,1000"))     # Standard deviation
print(result.counts)                      # {drf: count} per channel
print(result.rate())                      # Readings/sec per channel

Alternatively, collect until each channel has a minimum number of readings:

result = mon.collect(count=100, timeout=30.0)

Non-Blocking (Start/Stop)

mon = Monitor(["M:OUTTMP@p,1000"])
mon.start()

# Peek without consuming data
snap = mon.snapshot()

# Swap out buffers and return old data
result = mon.flush()

mon.stop()

Or use the context manager:

with Monitor(["M:OUTTMP@p,1000"]) as mon:
    snap = mon.snapshot()

Waiting for New Data

Use await_next() to block until a new reading arrives on a specific channel:

with Monitor(["M:OUTTMP@p,1000"]) as mon:
    reading = mon.await_next("M:OUTTMP@p,1000", timeout=5.0)
    print(reading.value)

Use wait_until() to block until an arbitrary predicate is satisfied:

with Monitor(["M:OUTTMP@p,1000"]) as mon:
    result = mon.wait_until(
        lambda snap: snap.mean("M:OUTTMP@p,1000") > 72.0,
        timeout=30.0,
    )

Change Detection

Use tags and has_new() for cheap polling without copying buffers:

with Monitor(["M:OUTTMP@p,1000"]) as mon:
    old_tags = mon.tags
    # ... do other work ...
    if mon.has_new(old_tags):
        snap = mon.snapshot()

Constructor Options

Parameter Type Default Description
devices list[DeviceSpec] required DRF strings or Device objects
buffer_size int 10_000 Max readings per channel (ring buffer)
backend Backend None Backend to use (global default if None)

MonitorResult

Returned by Monitor.collect(), snapshot(), and flush(). Provides statistics, slicing, and export methods.

Statistics

All stat methods accept an optional drf argument. If given, returns a single value; if omitted, returns a dict keyed by DRF.

Method Description
mean(drf?) Arithmetic mean
std(drf?) Standard deviation (population)
median(drf?) Median value
min(drf?) Minimum value
max(drf?) Maximum value
rate(drf?) Readings per second
last(n, drf?) Last n values

Properties

Property Type Description
channels dict[str, ChannelData] Per-channel data
counts dict[str, int] Reading count per channel
elapsed timedelta \| None Duration between start and stop
started datetime \| None When collection started
stopped datetime \| None When collection stopped

Accessing Raw Data

values = result.values("M:OUTTMP@p,1000")          # list[Value]
timestamps = result.timestamps("M:OUTTMP@p,1000")  # list[datetime]

Time Slicing

from datetime import datetime, timezone

start = datetime(2026, 3, 5, 12, 0, tzinfo=timezone.utc)
end = datetime(2026, 3, 5, 12, 5, tzinfo=timezone.utc)
sliced = result.slice("M:OUTTMP@p,1000", start=start, end=end)
# Returns a ChannelData with only readings in [start, end]

Export to NumPy

timestamps, values = result.to_numpy("M:OUTTMP@p,1000")
# timestamps: float64 epoch seconds (UTC)
# values: float64

Export to pandas

# Single channel — indexed by timestamp
df = result.to_dataframe("M:OUTTMP@p,1000")

# All channels — flat table with drf column
df = result.to_dataframe()

# Relative timestamps (seconds since collection started)
df = result.to_dataframe("M:OUTTMP@p,1000", relative=True)

read_fresh

Wait for one or more fresh readings per channel via a temporary subscription. Consider it a pacsys.read() but with a lot more options and ability to collect multiple readings.

from pacsys.exp import read_fresh

# Single reading per channel (default)
results = read_fresh(["M:OUTTMP", "G:AMANDA"], default_event="p,1000")
for r in results:
    print(f"{r.drf}: {r.value}")

# Collect 10 readings and average
results = read_fresh(["M:OUTTMP@p,1000"], count=10, timeout=5.0)
print(results[0].mean())     # mean of last 10
print(results[0].std())      # standard deviation
print(results[0].median())   # median
print(results[0].values)     # all collected values

Parameters

Parameter Type Default Description
devices list[DeviceSpec] required DRF strings or Device objects
count int 1 Readings to collect per channel
default_event str \| None None Event to apply if DRF has none
timeout float 5.0 Max seconds to wait
backend Backend \| None None Backend to use

FreshResult

Each element in the returned list is a FreshResult:

Property / Method Returns Description
value Value Last reading's value
values list[Value] All ok values
reading Reading Last Reading object
timestamp datetime Last reading's timestamp
timestamps list[datetime] All timestamps
length int Number of ok values
len(r) int Total readings collected
requested_count int How many were requested

Windowed Statistics

Stats methods accept an optional n parameter:

Call Window
r.mean() Last requested_count values
r.mean(-1) All values
r.mean(8) Last 8 values (raises if < 8)

Available: mean, median, std, min, max.


watch

Block until a condition is met on a streaming channel.

from pacsys.exp import watch

# Wait for temperature to exceed 80
reading = watch(
    "M:OUTTMP@p,1000",
    condition=lambda r: r.ok and r.value > 80.0,
    timeout=60.0,
)
print(f"Crossed threshold at {reading.timestamp}: {reading.value}")
Parameter Type Default Description
device DeviceSpec required DRF string or Device (must include a streaming event)
condition Callable[[Reading], bool] required Predicate; returns True to stop
timeout float 30.0 Max seconds to wait
backend Backend \| None None Backend to use

Returns the Reading that satisfied the condition. Raises TimeoutError if the condition is not met within the timeout.


scan

Ramp a device through a series of values while reading other devices at each step. Automatically restores the original setting value on completion or error.

from pacsys.exp import scan

result = scan(
    write_device="Z:ACLTST",
    read_devices=["M:OUTTMP", "G:AMANDA"],
    start=0.0,
    stop=10.0,
    steps=11,
    settle=0.5,
)

# Inspect results
for sv, step in zip(result.set_values, result.readings):
    print(f"Set {sv}: read {step['M:OUTTMP'].value}")

# Export to DataFrame
df = result.to_dataframe()

Explicit Values

result = scan(
    write_device="Z:ACLTST",
    read_devices=["M:OUTTMP"],
    values=[0.0, 2.5, 5.0, 7.5, 10.0],
)

Averaging and Abort

result = scan(
    write_device="Z:ACLTST",
    read_devices=["M:OUTTMP"],
    start=0.0, stop=10.0, steps=11,
    readings_per_step=5,  # Average 5 readings per step
    abort_if=lambda step: step["M:OUTTMP"].value > 100.0,
)
print(result.aborted)   # True if abort_if triggered
print(result.restored)  # True if original value was restored

Parameters

Parameter Type Default Description
write_device DeviceSpec required Device to ramp
read_devices list[DeviceSpec] required Devices to read at each step
values list[float] None Explicit scan values
start / stop / steps float / float / int None Linear range (alternative to values)
settle float 0.5 Seconds to wait after each write
readings_per_step int 1 Readings to average per step
verify bool \| Verify None Verify writes (see Writing)
restore bool True Restore original setting on completion
abort_if Callable None Abort predicate receiving step readings
timeout float \| None None Per-operation timeout
backend Backend \| None None Backend to use

ScanResult

Property Type Description
write_device str DRF of the ramped device
read_devices list[str] DRFs of the read devices
set_values list[float] Values that were written
readings list[dict[str, Reading]] Per-step readings keyed by DRF
write_results list[WriteResult] Per-step write results
aborted bool Whether abort_if triggered
restored bool Whether original value was restored
to_dataframe() DataFrame Export with set_value + read columns

DataLogger

Subscribe to channels and log readings to a file via a pluggable writer. Runs in the background with periodic flushing.

from pacsys.exp import DataLogger, CsvWriter

with DataLogger(["M:OUTTMP@p,1000"], writer=CsvWriter("log.csv")) as dl:
    time.sleep(60)  # Log for 60 seconds

Manual Control

dl = DataLogger(
    ["M:OUTTMP@p,1000", "G:AMANDA@e,8f"],
    writer=ParquetWriter("data.parquet"),
    flush_interval=10.0,
)
dl.start()
# ... do work ...
dl.stop()  # Flushes remaining data and closes the writer

Constructor Options

Parameter Type Default Description
devices list[DeviceSpec] required Channels to subscribe to
writer LogWriter required Writer implementation
flush_interval float 5.0 Seconds between flushes
backend Backend \| None None Backend to use

Properties

Property Type Description
running bool Whether the logger is actively collecting
last_error Exception \| None Last write error, if any

Failed writes are retried up to 3 times before the batch is dropped. Errors are logged and available via last_error.


Writers

Writers implement the LogWriter protocol:

class LogWriter(Protocol):
    def write_readings(self, readings: list[Reading]) -> None: ...
    def close(self) -> None: ...

CsvWriter

Simple CSV output with columns: timestamp, drf, value, units.

from pacsys.exp import CsvWriter

writer = CsvWriter("output.csv")

ParquetWriter

Typed columnar output using Apache Parquet (requires pyarrow). Handles scalars, arrays, text, raw bytes, alarms, and status values in separate typed columns.

from pacsys.exp import ParquetWriter

writer = ParquetWriter("output.parquet")

Schema:

Column Type Content
timestamp timestamp[us, UTC] Reading timestamp
drf string DRF string
value_type string Value type name
value float64 Scalar values
value_array list<float64> Array values
value_text string Text, JSON-encoded alarms/status, base64-encoded raw bytes
error_code int16 ACNET error code
units string Engineering units
cycle int64 Cycle number

You can also implement your own writer by conforming to the LogWriter protocol.


See Also