Skip to content

API Reference

Complete reference for pacsys functions, classes, and types.


Simple API

These functions use a global backend that is automatically initialized on first use.

pacsys.read(device, timeout=None)

Read a single device value using the global DPM backend.

Parameters:

Name Type Description Default
device DeviceSpec

DRF string or Device object

required
timeout Optional[float]

Total timeout for entire operation in seconds (default: 5.0)

None

Returns:

Type Description
Value

The device value (float, numpy array, string, etc.)

Raises:

Type Description
ValueError

If DRF syntax is invalid

DeviceError

If the read fails (status_code < 0)

Note

Even if DRF specifies periodic event (@p,1000), only FIRST reading is returned. Use Session for continuous data.

Thread Safety

Safe to call from multiple threads. Each call borrows a connection from the shared pool for the duration of the operation.

pacsys.get(device, timeout=None)

Read a single device with full metadata using the global DPM backend.

Parameters:

Name Type Description Default
device DeviceSpec

DRF string or Device object

required
timeout Optional[float]

Total timeout for operation in seconds (default: 5.0)

None

Returns:

Type Description
Reading

Reading object with value, status, timestamp, and metadata.

Reading

Check reading.is_error for error status.

Raises:

Type Description
ValueError

If DRF syntax is invalid

Thread Safety

Safe to call from multiple threads.

pacsys.get_many(devices, timeout=None)

Read multiple devices in a single batch using the global DPM backend.

Parameters:

Name Type Description Default
devices list[DeviceSpec]

List of DRF strings or Device objects (can mix)

required
timeout Optional[float]

Total timeout for entire batch in seconds (not per-device)

None

Returns:

Type Description
list[Reading]

List of Reading objects in same order as input.

Raises:

Type Description
ReadError

On transport failure (timeout, connection drop). Partial results are available via exc.readings.

ValueError

If any DRF syntax is invalid (before network I/O)

Thread Safety

Safe to call from multiple threads.

pacsys.write(device, value, timeout=None)

Write a single device value using the global backend.

Parameters:

Name Type Description Default
device DeviceSpec

DRF string or Device object

required
value Value

Value to write

required
timeout Optional[float]

Total timeout in seconds

None

Returns:

Type Description
WriteResult

WriteResult with status

Raises:

Type Description
AuthenticationError

If no auth configured (use configure(auth=...))

DeviceError

If the write fails

Thread Safety

Safe to call from multiple threads.

pacsys.write_many(settings, timeout=None)

Write multiple device values in a single batch using the global backend.

Parameters:

Name Type Description Default
settings WriteSettings

List of (device, value) tuples, or a dict mapping device -> value

required
timeout Optional[float]

Total timeout for entire batch in seconds

None

Returns:

Type Description
list[WriteResult]

List of WriteResult objects in same order as input.

Raises:

Type Description
AuthenticationError

If no auth configured (use configure(auth=...))

Thread Safety

Safe to call from multiple threads.

pacsys.subscribe(drfs, callback=None, on_error=None)

Subscribe to devices for streaming using the global DPM backend.

Creates subscriptions that immediately start receiving data. The handle can be used as a context manager for automatic cleanup.

Parameters:

Name Type Description Default
drfs list[DeviceSpec]

List of device request strings or Device objects (with events, e.g. "M:OUTTMP@p,1000")

required
callback Optional[ReadingCallback]

Optional function called for each reading, receives (reading, handle). If provided, readings are pushed to the callback on the receiver thread. If None, use handle.readings() to iterate over readings.

None
on_error Optional[ErrorCallback]

Optional function called when a connection error occurs, receives (exception, handle). If not provided, errors are raised during iteration or logged in callback mode.

None

Returns:

Type Description
SubscriptionHandle

SubscriptionHandle for managing this subscription

Example (callback mode): def on_reading(reading, handle): print(f"{reading.name}: {reading.value}") if reading.value > 100: handle.stop()

handle = pacsys.subscribe(["M:OUTTMP@p,1000"], callback=on_reading)
time.sleep(10)
handle.stop()
pacsys.shutdown()

Example (iterator mode): with pacsys.subscribe(["M:OUTTMP@p,1000"]) as sub: for reading, handle in sub.readings(timeout=10): print(f"{reading.name}: {reading.value}") if reading.value > 10: sub.stop() pacsys.shutdown()

Example (with error handler): def on_error(exc, handle): print(f"Connection error: {exc}")

handle = pacsys.subscribe(
    ["M:OUTTMP@p,1000"],
    callback=on_reading,
    on_error=on_error,
)

pacsys.configure(*, dpm_host=_UNSET, dpm_port=_UNSET, pool_size=_UNSET, default_timeout=_UNSET, devdb_host=_UNSET, devdb_port=_UNSET, backend=_UNSET, auth=_UNSET, role=_UNSET)

Configure pacsys global settings.

Can be called at any time. If a backend is already initialized, it will be automatically shut down before applying the new settings. Pass None to clear a previously set value (falls back to environment variable or default).

Parameters:

Name Type Description Default
dpm_host Optional[str] | _Unset

DPM proxy hostname (default: from PACSYS_DPM_HOST or acsys-proxy.fnal.gov)

_UNSET
dpm_port Optional[int] | _Unset

DPM proxy port (default: from PACSYS_DPM_PORT or 6802)

_UNSET
pool_size Optional[int] | _Unset

Connection pool size (default: from PACSYS_POOL_SIZE or 4)

_UNSET
default_timeout Optional[float] | _Unset

Default operation timeout in seconds (default: from PACSYS_TIMEOUT or 5.0)

_UNSET
devdb_host Optional[str] | _Unset

DevDB gRPC hostname (default: from PACSYS_DEVDB_HOST or ad-services.fnal.gov/services.devdb)

_UNSET
devdb_port Optional[int] | _Unset

DevDB gRPC port (default: from PACSYS_DEVDB_PORT or 6802)

_UNSET
backend Optional[str] | _Unset

Backend type - one of "dpm", "grpc", "dmq", "acl" (default: "dpm")

_UNSET
auth Optional[Auth] | str | _Unset

Authentication object (KerberosAuth or JWTAuth) for writes, or "krb" as shortcut for KerberosAuth()

_UNSET
role Optional[str] | _Unset

Role for authenticated operations (e.g., "testing")

_UNSET

Raises:

Type Description
ValueError

If backend is not a valid backend type

pacsys.shutdown()

Close and release the global lazy-initialized backend and DevDB client.

The global backend is automatically closed on interpreter exit via atexit, so explicit shutdown() is only needed to reset state mid-process (e.g., between tests or before re-configuring).

After shutdown(), the next read/get call will re-initialize the backend using existing configuration from configure(). Configuration is preserved across shutdown/re-init cycles -- use configure() to change settings.

Safe to call multiple times or when no backend is initialized.


Backend Factories

Create explicit backend instances for more control.

pacsys.dpm(host=None, port=None, pool_size=None, timeout=None, auth=None, role=None, dispatch_mode=DispatchMode.WORKER)

Create a DPM backend instance with its own connection pool.

Alias for dpm_http(). Each subscribe() call creates its own TCP connection, allowing independent subscriptions.

Parameters:

Name Type Description Default
host Optional[str]

DPM proxy hostname (default: acsys-proxy.fnal.gov)

None
port Optional[int]

DPM proxy port (default: 6802)

None
pool_size Optional[int]

Connection pool size (default: 4)

None
timeout Optional[float]

Default operation timeout in seconds (default: 5.0)

None
auth Optional[Auth]

Authentication object (KerberosAuth for writes)

None
role Optional[str]

Role for authenticated operations (e.g., "testing") Required for write operations.

None
dispatch_mode DispatchMode

How streaming callbacks are dispatched (default: WORKER)

WORKER

Returns:

Type Description
DPMHTTPBackend

DPMHTTPBackend instance (use as context manager or call close() when done)

Example (read-only): with pacsys.dpm() as backend: temp = backend.read("M:OUTTMP")

Example (authenticated writes): auth = KerberosAuth() with pacsys.dpm(auth=auth, role="testing") as backend: print(f"Authenticated as: {backend.principal}") result = backend.write("M:OUTTMP", 72.5)

pacsys.grpc(host=None, port=None, auth=None, timeout=None, dispatch_mode=DispatchMode.WORKER)

Create a gRPC backend instance.

Uses the DAQ gRPC service for reads and writes. Writes require JWT authentication.

Parameters:

Name Type Description Default
host Optional[str]

gRPC server hostname (env: PACSYS_GRPC_HOST, default: dce08.fnal.gov)

None
port Optional[int]

gRPC server port (env: PACSYS_GRPC_PORT, default: 50051)

None
auth Optional[Auth]

Authentication object (JWTAuth for writes). If None, tries PACSYS_JWT_TOKEN env.

None
timeout Optional[float]

Default operation timeout in seconds (default: 5.0)

None

Returns:

Type Description
GRPCBackend

GRPCBackend instance (use as context manager or call close() when done)

Raises:

Type Description
ImportError

If grpc package is not installed

Example (read-only): with pacsys.grpc() as backend: temp = backend.read("M:OUTTMP")

Example (with JWT): auth = JWTAuth(token="eyJ...") with pacsys.grpc(auth=auth) as backend: print(f"Authenticated as: {backend.principal}") result = backend.write("M:OUTTMP", 72.5)

Example (token from environment): # export PACSYS_JWT_TOKEN="eyJ..." with pacsys.grpc() as backend: if backend.authenticated: print(f"Authenticated as: {backend.principal}")

pacsys.dmq(host=None, port=None, timeout=None, auth=None, write_session_ttl=None, dispatch_mode=DispatchMode.WORKER)

Create a DMQ backend instance (RabbitMQ/AMQP).

Uses RabbitMQ to communicate with ACNET via the DMQ server. Requires Kerberos authentication for ALL operations (including reads).

Parameters:

Name Type Description Default
host Optional[str]

RabbitMQ broker hostname (default: from PACSYS_DMQ_HOST or appsrv2.fnal.gov)

None
port Optional[int]

RabbitMQ broker port (default: from PACSYS_DMQ_PORT or 5672)

None
timeout Optional[float]

Default operation timeout in seconds (default: 10.0)

None
auth Optional[Auth]

KerberosAuth required for all DMQ operations

None
write_session_ttl Optional[float]

Idle timeout for write sessions in seconds (default: 600)

None

Returns:

Type Description
DMQBackend

DMQBackend instance (use as context manager or call close() when done)

Raises:

Type Description
AuthenticationError

If auth is not provided or not KerberosAuth

ImportError

If pika or gssapi packages are not installed

Example

auth = KerberosAuth() with pacsys.dmq(auth=auth) as backend: temp = backend.read("M:OUTTMP") result = backend.write("Z:ACLTST", 45.0)

pacsys.acl(base_url=None, timeout=None)

Create an ACL backend instance (read-only, no streaming, no auth).

Parameters:

Name Type Description Default
base_url Optional[str]

ACL CGI base URL (default: https://www-bd.fnal.gov/cgi-bin/acl.pl)

None
timeout Optional[float]

Default operation timeout in seconds

None

Returns:

Type Description
ACLBackend

ACLBackend instance

Example

with pacsys.acl() as backend: temp = backend.read("M:OUTTMP") reading = backend.get("M:OUTTMP") readings = backend.get_many(["M:OUTTMP", "G:AMANDA"])

pacsys.dpm_http(host=None, port=None, pool_size=None, timeout=None, auth=None, role=None, dispatch_mode=DispatchMode.WORKER)

Create a DPM HTTP backend with independent streaming subscriptions.

This backend uses TCP/HTTP protocol to communicate with DPM. Each subscribe() call creates its own TCP connection, allowing truly independent subscriptions that can be started/stopped individually.

Parameters:

Name Type Description Default
host Optional[str]

DPM proxy hostname (default: acsys-proxy.fnal.gov)

None
port Optional[int]

DPM proxy port (default: 6802)

None
pool_size Optional[int]

Connection pool size for reads (default: 4)

None
timeout Optional[float]

Default operation timeout in seconds (default: 5.0)

None
auth Optional[Auth]

Authentication object (KerberosAuth for writes)

None
role Optional[str]

Role for authenticated operations (e.g., "testing")

None

Returns:

Type Description
DPMHTTPBackend

DPMHTTPBackend instance

Example (multiple independent subscriptions): with pacsys.dpm_http() as backend: sub1 = backend.subscribe(["M:OUTTMP@p,1000"]) sub2 = backend.subscribe(["G:AMANDA@p,500"])

    # Stopping sub1 doesn't affect sub2
    sub1.stop()

    for reading, _ in sub2.readings(timeout=10):
        print(f"{reading.name}: {reading.value}")

pacsys.devdb

DevDB gRPC client for querying device metadata from the master PostgreSQL database.

DevDB is a metadata service (not a data acquisition backend). It provides device information like scaling parameters, control commands, and status bit definitions.

Usage

import pacsys

with pacsys.devdb() as db: info = db.get_device_info(["Z:ACLTST", "M:OUTTMP"]) print(info["Z:ACLTST"].description)

AlarmBlockInfo dataclass

Alarm block information from DevDB.

AlarmInfo dataclass

Alarm information for a device from DevDB.

AlarmText dataclass

Alarm text entry from DevDB.

ControlCommandDef dataclass

A control command definition from DevDB.

DevDBClient

Synchronous gRPC client for DevDB device metadata queries.

Parameters:

Name Type Description Default
host str | None

DevDB gRPC server hostname (default: from PACSYS_DEVDB_HOST or ad-services.fnal.gov/services.devdb)

None
port int | None

DevDB gRPC server port (default: from PACSYS_DEVDB_PORT or 6802)

None
timeout float | None

RPC timeout in seconds (default: 5.0)

None
cache_ttl float

TTL for cached results in seconds (default: 3600.0)

3600.0

Raises:

Type Description
ImportError

If grpc package is not available

clear_cache(device=None)

Clear cached results.

Parameters:

Name Type Description Default
device str | None

If given, clear only this device's cache. Otherwise clear all.

None
close()

Close the gRPC channel.

get_alarm_info(names)

Query alarm information for devices.

Parameters:

Name Type Description Default
names list[str]

Device names

required

Returns:

Type Description
list[AlarmInfo]

List of AlarmInfo entries.

Raises:

Type Description
RpcError

On gRPC transport failure.

get_alarm_text(ids)

Query alarm text entries by ID.

Parameters:

Name Type Description Default
ids list[int]

Alarm text IDs

required

Returns:

Type Description
list[AlarmText]

List of AlarmText entries.

Raises:

Type Description
RpcError

On gRPC transport failure.

get_device_info(names, timeout=None)

Query device metadata for one or more devices.

Parameters:

Name Type Description Default
names list[str]

Device names (e.g., ["Z:ACLTST", "M:OUTTMP"])

required
timeout float | None

gRPC timeout in seconds (default: client's configured timeout).

None

Returns:

Type Description
dict[str, DeviceInfoResult]

Dict mapping device name to DeviceInfoResult.

Raises:

Type Description
DeviceError

If DevDB returns an error for a device.

RpcError

On gRPC transport failure.

DeviceInfoResult dataclass

Complete device metadata from DevDB.

ExtStatusBitDef dataclass

Extended status bit definition from DevDB.

PropertyInfo dataclass

Scaling and metadata for a device property (reading or setting).

StatusBitDef dataclass

One status bit definition from DevDB (not a runtime value).

Uses mask/match/invert for proper bit evaluation: is_active = ((raw_value & mask) == match) ^ invert

pacsys.supervised

Supervised mode: gRPC proxy server with logging and policy enforcement.

Wraps any Backend instance and exposes it as a gRPC DAQ service, forwarding requests while enforcing access policies and logging all traffic.

Example

from pacsys.testing import FakeBackend from pacsys.supervised import SupervisedServer, ReadOnlyPolicy

fb = FakeBackend() fb.set_reading("M:OUTTMP", 72.5)

with SupervisedServer(fb, port=50099, policies=[ReadOnlyPolicy()]) as srv: import pacsys with pacsys.grpc(host="localhost", port=50099) as client: print(client.read("M:OUTTMP"))

AuditLog

Structured audit log with optional raw protobuf capture.

Parameters:

Name Type Description Default
path str

JSON lines file path.

required
proto_path Optional[str]

Binary protobuf file path (optional).

None
log_responses bool

Log outgoing responses too (default: False).

False
flush_interval int

Flush files every N writes (default: 1).

1
close()

Flush and close both files.

log_request(ctx, decision)

Log incoming request. Returns sequence number for correlation.

log_response(seq, peer, method, response_proto)

Log outgoing response. No-op when log_responses is False.

DeviceAccessPolicy

Bases: Policy

Allow or deny access based on device name patterns.

Parameters:

Name Type Description Default
patterns list[str]

List of patterns (e.g. ["M:*", "G:AMANDA"])

required
mode str

"allow" = approve matching devices, "deny" = block matching devices

'allow'
action str

"all" (both RPCs), "read" (Read only), "set" (Set only)

'all'
syntax str

"glob" (fnmatch, default) or "regex" (full-match, case-insensitive)

'glob'

Policy

Bases: ABC

Abstract base for policy checks. Implement check() to allow or deny requests.

allows_writes property

Whether this policy explicitly gates write access.

PolicyDecision dataclass

Result of a policy check.

On deny: reason is required, ctx is ignored. On allow without modification: ctx is None. On allow with modification: ctx is a new RequestContext.

RateLimitPolicy

Bases: Policy

Sliding window rate limit per peer.

Parameters:

Name Type Description Default
max_requests int

Maximum requests per window

required
window_seconds float

Window size in seconds (default: 60)

60.0

ReadOnlyPolicy

Bases: Policy

Denies Set RPCs, allows everything else.

RequestContext dataclass

Context for a single RPC request, passed to policy checks.

SlewLimit dataclass

Constraints for a single device pattern in :class:SlewRatePolicy.

At least one of max_step or max_rate must be set.

Attributes:

Name Type Description
max_step Optional[float]

Maximum absolute change per write (units).

max_rate Optional[float]

Maximum rate of change (units/second).

SlewRatePolicy

Bases: Policy

Deny writes that change too fast or by too much.

Parameters:

Name Type Description Default
limits dict[str, SlewLimit]

Mapping of device name glob pattern to :class:SlewLimit.

required

First write to any device is always allowed (no history). History is updated on allow (accepts that failed backend writes will leave stale history).

SupervisedServer

gRPC proxy server with logging and policy enforcement.

Wraps any Backend and exposes the DAQ gRPC service, forwarding requests while enforcing policies and logging all traffic.

Parameters:

Name Type Description Default
backend Backend | AsyncBackend

Backend instance to proxy requests to

required
port int

Port to listen on (default: 50051)

50051
host str

Host to bind (default: "[::] " for all interfaces)

'[::]'
policies Optional[list[Policy]]

Optional list of Policy instances for access control

None
token Optional[str]

Optional bearer token for write authentication. When set, clients must send JWTAuth(token=...) with this value or write (Set) RPCs are rejected with UNAUTHENTICATED. Reads are always open.

None
Example

from pacsys.testing import FakeBackend from pacsys.supervised import SupervisedServer, ReadOnlyPolicy

fb = FakeBackend() fb.set_reading("M:OUTTMP", 72.5)

with SupervisedServer(fb, port=50099, policies=[ReadOnlyPolicy()]) as srv: # Clients can now connect to localhost:50099 srv.wait()

run()

Start the server and block until interrupted (SIGINT/SIGTERM).

Must be called from the main thread (signal handlers require it).

start()

Start the server in a background daemon thread.

stop()

Stop the server.

wait(timeout=None)

Block until the server stops or timeout.

ValueRangePolicy

Bases: Policy

Deny writes where numeric values fall outside allowed ranges.

Parameters:

Name Type Description Default
limits dict[str, tuple[float, float]]

Mapping of device name glob pattern to (min, max) bounds.

required

evaluate_policies(policies, ctx)

Evaluate a chain of policies. First denial short-circuits.

Each policy sees the (potentially modified) context from the previous policy. The final decision always carries ctx set to the final context.


Types

Reading

pacsys.types.Reading dataclass

A device reading with status and optional data.

Status semantics (matches gRPC Status message): - facility_code: ACNET facility identifier (0=success, 1=ACNET, 16=DBM, 17=DPM) - error_code: 0=success, >0=warning, <0=error

value = None class-attribute instance-attribute

value_type = None class-attribute instance-attribute

name property

Device name extracted from DRF or metadata.

drf instance-attribute

units property

Engineering units from metadata, or None if unavailable.

timestamp = None class-attribute instance-attribute

cycle = None class-attribute instance-attribute

facility_code = 0 class-attribute instance-attribute

error_code = 0 class-attribute instance-attribute

message = None class-attribute instance-attribute

is_success property

True if status indicates success (error_code == 0).

is_error property

True if status indicates error (error_code < 0).

is_warning property

True if status indicates warning (error_code > 0).

ok property

True if data is usable (success or warning with data).

WriteResult

pacsys.types.WriteResult dataclass

Result of a write operation, optionally with verification info.

ok property

True if write succeeded (error_code == 0).

success property

Alias for ok.

from_dict(d) classmethod

Deserialize from a dict produced by to_dict().

to_dict()

Serialize to a JSON-safe dict. Round-trippable via WriteResult.from_dict().

SubscriptionHandle

pacsys.types.SubscriptionHandle

Handle for a streaming subscription.

Provides access to subscription state and allows stopping the subscription. Each handle has its own queue for readings. Use readings() to iterate over readings from THIS subscription only.

Usage

with backend.subscribe(["M:OUTTMP@p,1000"]) as sub: for reading, handle in sub.readings(timeout=10): print(reading.value) if reading.value > 10: sub.stop()

Manual control

sub = backend.subscribe(["M:OUTTMP@p,1000"]) for reading, handle in sub.readings(timeout=10): print(reading.value) sub.stop()

stopped property

True if this subscription has been stopped.

readings(timeout=None)

Yield (reading, handle) pairs for THIS subscription.

Parameters:

Name Type Description Default
timeout Optional[float]

Seconds to wait for next reading. None = block forever (until stop() called) 0 = non-blocking (drain buffered readings only)

None

Yields:

Type Description
tuple[Reading, SubscriptionHandle]

(reading, handle) pairs

stop()

Stop this subscription.

ValueType

pacsys.types.ValueType

Bases: Enum

Type of value returned from a device read.


Authentication

KerberosAuth

pacsys.auth.KerberosAuth dataclass

Bases: Auth

Kerberos authentication using system credential cache.

Requires
  • Valid Kerberos ticket (run kinit first)
  • gssapi library (pip install gssapi)

The credentials are obtained from the system credential cache, so the same KerberosAuth instance can be used across multiple backends and services.

Parameters:

Name Type Description Default
name Optional[str]

Kerberos principal to use (e.g., "user2@FNAL.GOV"). If None, uses the default principal from the credential cache.

None
Example

Default principal

auth = KerberosAuth()

Specific principal (e.g., multiple tickets in cache)

auth = KerberosAuth(name="operator@FNAL.GOV")

Note

Credentials are validated at construction time (fail fast).

principal property

Get principal name from credential cache.

__post_init__()

Validate credentials at construction time (fail fast).

JWTAuth

pacsys.auth.JWTAuth dataclass

Bases: Auth

JWT token authentication.

Can be used with gRPC backend and other JWT-accepting services. Token is stored as-is; validation is done server-side.

Example

Explicit token

auth = JWTAuth(token="eyJ...")

From environment variable

auth = JWTAuth.from_env() # reads PACSYS_JWT_TOKEN

Use with gRPC backend

backend = GRPCBackend(auth=auth)

Note

Token is excluded from repr to prevent credential leaks in logs.

principal property

Extract subject claim from JWT token.

Note: This decodes without verification. Server validates the token.

Returns:

Type Description
str

Subject ('sub') claim from token

Raises:

Type Description
ValueError

If token format is invalid

from_env(var='PACSYS_JWT_TOKEN') classmethod

Create JWTAuth from environment variable.

Parameters:

Name Type Description Default
var str

Environment variable name (default: PACSYS_JWT_TOKEN)

'PACSYS_JWT_TOKEN'

Returns:

Type Description
Optional[JWTAuth]

JWTAuth if environment variable is set, None otherwise


Errors

DeviceError

pacsys.errors.DeviceError

Bases: Exception

Raised when a device read fails.

Attributes:

Name Type Description
drf

Device request string that failed

facility_code

ACNET facility identifier (1=ACNET, 16=DBM, 17=DPM)

error_code

Error code (negative indicates error, positive is warning)

message

Human-readable error description

AuthenticationError

pacsys.errors.AuthenticationError

Bases: Exception

Raised when authentication fails or is required.

This exception is raised when: - A write is attempted without authentication - Kerberos ticket is expired or missing - JWT token is invalid or expired

ReadError

pacsys.errors.ReadError

Bases: Exception

Batch read failure.

Raised in two scenarios: - get_many(): transport error (timeout, connection drop, auth failure). The underlying network exception is chained via __cause__. - read_many(): any device in the batch returned an unusable reading (ACNET error or missing value).

Attributes:

Name Type Description
readings

Complete results list (same length as input drfs). Includes both successful readings and error-backfilled entries.

message

Human-readable description of the failure.

ACLError

pacsys.errors.ACLError

Bases: Exception

Raised when an ACL command fails.

This exception is raised when: - A one-shot ACL command exits with non-zero status - An ACL session receives an error response - The ACL prompt times out - The ACL process exits unexpectedly


Device Classes

Object-oriented interface for device access.

Device

pacsys.device.Device

Bases: _DeviceBase

Device wrapper with DRF3 validation at construction.

Devices are immutable - modification methods return NEW Device instances.

read(*, field=None, timeout=None)

Read READING property. Raises DeviceError on failure.

get(*, prop=None, field=None, timeout=None)

Read device with full metadata (timestamp, cycle, meta).

setting(*, field=None, timeout=None)

Read SETTING property.

status(*, field=None, timeout=None)

Read STATUS property.

analog_alarm(*, field=None, timeout=None)

Read ANALOG alarm property.

digital_alarm(*, field=None, timeout=None)

Read DIGITAL alarm property.

description(*, field=None, timeout=None)

Read DESCRIPTION property.

digital_status(timeout=None)

Fetch full digital status (BIT_VALUE + BIT_NAMES + BIT_VALUES).

info(timeout=None)

Fetch device metadata from DevDB (cached).

write(value, *, field=None, verify=None, timeout=None)

Write to SETTING property.

control(command, *, verify=None, timeout=None)

Write CONTROL command.

on(*, verify=None, timeout=None)

off(*, verify=None, timeout=None)

reset(*, verify=None, timeout=None)

set_analog_alarm(settings, *, timeout=None)

Write ANALOG alarm property.

set_digital_alarm(settings, *, timeout=None)

Write DIGITAL alarm property.

subscribe(callback=None, on_error=None, *, prop=None, field=None, event=None)

Subscribe to device for streaming data.

Parameters:

Name Type Description Default
prop str | None

Property to subscribe to (default: READING)

None
field str | None

Sub-field (requires prop)

None
event str | None

Event string (e.g. "p,1000"). Uses device's event if None.

None
callback ReadingCallback | None

Optional callback for push-mode

None
on_error ErrorCallback | None

Optional error handler

None

Returns:

Type Description
SubscriptionHandle

SubscriptionHandle (usable as context manager)

Raises:

Type Description
ValueError

If no event available, or field given without prop

with_backend(backend)

Return new Device bound to a specific backend.

ScalarDevice

pacsys.device.ScalarDevice

Bases: Device

Device that returns scalar values (float).

read(*, field=None, timeout=None)

Read scalar value. Raises TypeError if not scalar.

ArrayDevice

pacsys.device.ArrayDevice

Bases: Device

Device that returns array values.

read(*, field=None, timeout=None)

Read array value. Raises TypeError if not array.

TextDevice

pacsys.device.TextDevice

Bases: Device

Device that returns text/string values.

read(*, field=None, timeout=None)

Read text value. Raises TypeError if not string.


SSH Utility

SSHClient

pacsys.ssh.SSHClient

SSH client supporting multi-hop connections, command execution, tunneling, and SFTP.

Connection is lazy - established on first operation. Uses paramiko Transport with GSSAPI (Kerberos), key, or password authentication per hop.

Parameters:

Name Type Description Default
hops Union[HopSpec, list[HopSpec]]

Target host(s). Accepts a hostname string, SSHHop, or list of either. Multiple hops create a chain (jump hosts).

required
auth Optional[object]

Optional KerberosAuth for GSSAPI hops. If None and any hop uses gssapi auth, credentials are validated at init (fail fast).

None
connect_timeout float

TCP connection timeout in seconds (default 10.0).

10.0
Example

with SSHClient("target.fnal.gov") as ssh: result = ssh.exec("hostname") print(result.stdout)

exec(command, timeout=None, input=None)

Execute a command on the remote host.

Parameters:

Name Type Description Default
command str

Shell command to execute

required
timeout Optional[float]

Command timeout in seconds (None = no timeout)

None
input Optional[str]

Optional stdin data to send

None

Returns:

Type Description
CommandResult

CommandResult with exit_code, stdout, stderr

Raises:

Type Description
SSHTimeoutError

If timeout is exceeded

SSHConnectionError

If transport is not active

exec_stream(command, timeout=None)

Execute a command and yield stdout lines as they arrive.

Parameters:

Name Type Description Default
command str

Shell command to execute

required
timeout Optional[float]

Command timeout in seconds

None

Yields:

Type Description
str

Lines of stdout output

Raises:

Type Description
SSHCommandError

If command exits with non-zero status (after all output)

SSHTimeoutError

If timeout is exceeded

exec_many(commands, timeout=None)

Execute multiple commands sequentially.

Parameters:

Name Type Description Default
commands list[str]

List of shell commands

required
timeout Optional[float]

Per-command timeout in seconds

None

Returns:

Type Description
list[CommandResult]

List of CommandResult in same order as input

forward(local_port, remote_host, remote_port)

Create a local port forward through the SSH connection.

Parameters:

Name Type Description Default
local_port int

Local port to listen on (0 for OS-assigned)

required
remote_host str

Remote host to forward to

required
remote_port int

Remote port to forward to

required

Returns:

Type Description
Tunnel

Tunnel object (use as context manager or call stop())

sftp()

Open an SFTP session on the remote host.

Returns:

Type Description
SFTPSession

SFTPSession (use as context manager or call close())

open_channel(command, timeout=None)

Open an SSH channel executing a command, with stdin kept open for interactive use.

Unlike exec(), the channel's stdin is NOT closed after opening, allowing interactive input. The caller is responsible for closing the channel.

Parameters:

Name Type Description Default
command str

Command to execute on the remote host

required
timeout float | None

Channel timeout in seconds (None = no timeout)

None

Returns:

Type Description
Channel

paramiko.Channel with the command running and stdin open

remote_process(command, *, timeout=30.0)

Open a persistent interactive process over SSH.

Parameters:

Name Type Description Default
command str

Command to execute on the remote host

required
timeout float

Default timeout for read operations in seconds

30.0

Returns:

Type Description
RemoteProcess

RemoteProcess (use as context manager or call close())

acl_session(*, timeout=30.0)

Open a persistent ACL interpreter session.

The session keeps an acl process alive over an SSH channel, avoiding process startup overhead. Each send() is a separate script execution - state does NOT persist between calls.

Parameters:

Name Type Description Default
timeout float

Default timeout for prompt detection in seconds

30.0

Returns:

Type Description
ACLSession

ACLSession (use as context manager or call close())

acl(command, timeout=None)

Execute ACL command(s) and return output text.

Commands are written to a temp script file on the remote host and executed as acl /tmp/pacsys_acl_XXXX.acl.

Parameters:

Name Type Description Default
command str | list[str]

ACL command string, or list of commands (semicolons in a string are treated as one line).

required
timeout float | None

Command timeout in seconds (default 30.0)

None

Returns:

Type Description
str

Command output with ACL prompts stripped

Raises:

Type Description
ACLError

If the ACL process exits with non-zero status

ValueError

If command list is empty

close()

Close the SSH connection and all tunnels.

RemoteProcess

pacsys.ssh.RemoteProcess

Persistent interactive process over SSH. Dumb bidirectional pipe.

Not thread-safe. Context manager. Does NOT own SSHClient.

Parameters:

Name Type Description Default
ssh SSHClient

Connected SSHClient instance

required
command str

Command to execute on the remote host

required
timeout float

Default timeout for read operations in seconds

30.0

alive property

Process still running.

send_line(line)

Send line + newline.

send_bytes(data)

Send raw bytes.

read_until(marker, timeout=None)

Read until marker found in stream, return bytes before marker.

Marker is consumed from buffer.

Raises:

Type Description
SSHTimeoutError

If timeout expires before marker found

SSHError

If channel closes before marker found

read_for(seconds)

Read all data arriving within timeout. Returns on idle.

close()

Close channel (idempotent).

SSHHop

pacsys.ssh.SSHHop dataclass

Configuration for a single SSH hop.

Parameters:

Name Type Description Default
hostname str

SSH server hostname (required, non-empty)

required
port int

SSH port (default 22)

22
username Optional[str]

SSH username (default: current OS user)

None
auth_method str

"gssapi", "key", or "password"

'gssapi'
key_filename Optional[str]

Path to private key (required when auth_method="key")

None
password Optional[str]

Password (required when auth_method="password", excluded from repr)

None

CommandResult

pacsys.ssh.CommandResult dataclass

Result of a remote command execution.

ACLSession

pacsys.acl_session.ACLSession

Persistent ACL interpreter session over SSH.

Opens an acl process on a remote host via SSH and keeps it alive, avoiding process startup overhead. Each send() is a separate script execution - state (variables, symbols) does NOT persist between calls. Combine dependent commands with semicolons in a single send().

Multiple sessions can coexist on the same SSHClient (paramiko multiplexes channels on a single transport).

Not thread-safe - do not share a single session across threads. Use separate sessions per thread instead.

Parameters:

Name Type Description Default
ssh SSHClient

Connected SSHClient instance

required
timeout float

Default timeout for prompt detection in seconds

30.0
Usage

with ssh.acl_session() as acl: acl.send("read M:OUTTMP")

Or explicitly:

acl = ACLSession(ssh_client) acl.send("read M:OUTTMP") acl.close()

send(command, timeout=None)

Send a command to the ACL interpreter and return the output.

Parameters:

Name Type Description Default
command str

ACL command string

required
timeout float | None

Override default timeout for this command

None

Returns:

Type Description
str

Command output with prompts and echoed command stripped

Raises:

Type Description
ACLError

If the session is closed, the process exits, or prompt times out

close()

Close the ACL session (closes the SSH channel, not the SSHClient).

SSH Exceptions

pacsys.ssh.SSHError

Bases: Exception

Base exception for SSH operations.

pacsys.ssh.SSHConnectionError

Bases: SSHError

Connection or authentication failure.

pacsys.ssh.SSHCommandError

Bases: SSHError

Command exited with non-zero status.

pacsys.ssh.SSHTimeoutError

Bases: SSHError

Operation timed out.