API Reference¶
Complete reference for pacsys functions, classes, and types.
Simple API¶
These functions use a global backend that is automatically initialized on first use.
pacsys.read(device, timeout=None)
¶
Read a single device value using the global DPM backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
DeviceSpec
|
DRF string or Device object |
required |
timeout
|
Optional[float]
|
Total timeout for entire operation in seconds (default: 5.0) |
None
|
Returns:
| Type | Description |
|---|---|
Value
|
The device value (float, numpy array, string, etc.) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If DRF syntax is invalid |
DeviceError
|
If the read fails (status_code < 0) |
Note
Even if DRF specifies periodic event (@p,1000), only FIRST reading is returned. Use Session for continuous data.
Thread Safety
Safe to call from multiple threads. Each call borrows a connection from the shared pool for the duration of the operation.
pacsys.get(device, timeout=None)
¶
Read a single device with full metadata using the global DPM backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
DeviceSpec
|
DRF string or Device object |
required |
timeout
|
Optional[float]
|
Total timeout for operation in seconds (default: 5.0) |
None
|
Returns:
| Type | Description |
|---|---|
Reading
|
Reading object with value, status, timestamp, and metadata. |
Reading
|
Check reading.is_error for error status. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If DRF syntax is invalid |
Thread Safety
Safe to call from multiple threads.
pacsys.get_many(devices, timeout=None)
¶
Read multiple devices in a single batch using the global DPM backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
devices
|
list[DeviceSpec]
|
List of DRF strings or Device objects (can mix) |
required |
timeout
|
Optional[float]
|
Total timeout for entire batch in seconds (not per-device) |
None
|
Returns:
| Type | Description |
|---|---|
list[Reading]
|
List of Reading objects in same order as input. |
Raises:
| Type | Description |
|---|---|
ReadError
|
On transport failure (timeout, connection drop).
Partial results are available via |
ValueError
|
If any DRF syntax is invalid (before network I/O) |
Thread Safety
Safe to call from multiple threads.
pacsys.write(device, value, timeout=None)
¶
Write a single device value using the global backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
DeviceSpec
|
DRF string or Device object |
required |
value
|
Value
|
Value to write |
required |
timeout
|
Optional[float]
|
Total timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
WriteResult
|
WriteResult with status |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If no auth configured (use configure(auth=...)) |
DeviceError
|
If the write fails |
Thread Safety
Safe to call from multiple threads.
pacsys.write_many(settings, timeout=None)
¶
Write multiple device values in a single batch using the global backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
WriteSettings
|
List of (device, value) tuples, or a dict mapping device -> value |
required |
timeout
|
Optional[float]
|
Total timeout for entire batch in seconds |
None
|
Returns:
| Type | Description |
|---|---|
list[WriteResult]
|
List of WriteResult objects in same order as input. |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If no auth configured (use configure(auth=...)) |
Thread Safety
Safe to call from multiple threads.
pacsys.subscribe(drfs, callback=None, on_error=None)
¶
Subscribe to devices for streaming using the global DPM backend.
Creates subscriptions that immediately start receiving data. The handle can be used as a context manager for automatic cleanup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
drfs
|
list[DeviceSpec]
|
List of device request strings or Device objects (with events, e.g. "M:OUTTMP@p,1000") |
required |
callback
|
Optional[ReadingCallback]
|
Optional function called for each reading, receives (reading, handle). If provided, readings are pushed to the callback on the receiver thread. If None, use handle.readings() to iterate over readings. |
None
|
on_error
|
Optional[ErrorCallback]
|
Optional function called when a connection error occurs, receives (exception, handle). If not provided, errors are raised during iteration or logged in callback mode. |
None
|
Returns:
| Type | Description |
|---|---|
SubscriptionHandle
|
SubscriptionHandle for managing this subscription |
Example (callback mode): def on_reading(reading, handle): print(f"{reading.name}: {reading.value}") if reading.value > 100: handle.stop()
handle = pacsys.subscribe(["M:OUTTMP@p,1000"], callback=on_reading)
time.sleep(10)
handle.stop()
pacsys.shutdown()
Example (iterator mode): with pacsys.subscribe(["M:OUTTMP@p,1000"]) as sub: for reading, handle in sub.readings(timeout=10): print(f"{reading.name}: {reading.value}") if reading.value > 10: sub.stop() pacsys.shutdown()
Example (with error handler): def on_error(exc, handle): print(f"Connection error: {exc}")
handle = pacsys.subscribe(
["M:OUTTMP@p,1000"],
callback=on_reading,
on_error=on_error,
)
pacsys.configure(*, dpm_host=_UNSET, dpm_port=_UNSET, pool_size=_UNSET, default_timeout=_UNSET, devdb_host=_UNSET, devdb_port=_UNSET, backend=_UNSET, auth=_UNSET, role=_UNSET)
¶
Configure pacsys global settings.
Can be called at any time. If a backend is already initialized, it will be automatically shut down before applying the new settings. Pass None to clear a previously set value (falls back to environment variable or default).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dpm_host
|
Optional[str] | _Unset
|
DPM proxy hostname (default: from PACSYS_DPM_HOST or acsys-proxy.fnal.gov) |
_UNSET
|
dpm_port
|
Optional[int] | _Unset
|
DPM proxy port (default: from PACSYS_DPM_PORT or 6802) |
_UNSET
|
pool_size
|
Optional[int] | _Unset
|
Connection pool size (default: from PACSYS_POOL_SIZE or 4) |
_UNSET
|
default_timeout
|
Optional[float] | _Unset
|
Default operation timeout in seconds (default: from PACSYS_TIMEOUT or 5.0) |
_UNSET
|
devdb_host
|
Optional[str] | _Unset
|
DevDB gRPC hostname (default: from PACSYS_DEVDB_HOST or ad-services.fnal.gov/services.devdb) |
_UNSET
|
devdb_port
|
Optional[int] | _Unset
|
DevDB gRPC port (default: from PACSYS_DEVDB_PORT or 6802) |
_UNSET
|
backend
|
Optional[str] | _Unset
|
Backend type - one of "dpm", "grpc", "dmq", "acl" (default: "dpm") |
_UNSET
|
auth
|
Optional[Auth] | str | _Unset
|
Authentication object (KerberosAuth or JWTAuth) for writes, or "krb" as shortcut for KerberosAuth() |
_UNSET
|
role
|
Optional[str] | _Unset
|
Role for authenticated operations (e.g., "testing") |
_UNSET
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If backend is not a valid backend type |
pacsys.shutdown()
¶
Close and release the global lazy-initialized backend and DevDB client.
The global backend is automatically closed on interpreter exit via atexit, so explicit shutdown() is only needed to reset state mid-process (e.g., between tests or before re-configuring).
After shutdown(), the next read/get call will re-initialize the backend using existing configuration from configure(). Configuration is preserved across shutdown/re-init cycles -- use configure() to change settings.
Safe to call multiple times or when no backend is initialized.
Backend Factories¶
Create explicit backend instances for more control.
pacsys.dpm(host=None, port=None, pool_size=None, timeout=None, auth=None, role=None, dispatch_mode=DispatchMode.WORKER)
¶
Create a DPM backend instance with its own connection pool.
Alias for dpm_http(). Each subscribe() call creates its own TCP connection, allowing independent subscriptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
Optional[str]
|
DPM proxy hostname (default: acsys-proxy.fnal.gov) |
None
|
port
|
Optional[int]
|
DPM proxy port (default: 6802) |
None
|
pool_size
|
Optional[int]
|
Connection pool size (default: 4) |
None
|
timeout
|
Optional[float]
|
Default operation timeout in seconds (default: 5.0) |
None
|
auth
|
Optional[Auth]
|
Authentication object (KerberosAuth for writes) |
None
|
role
|
Optional[str]
|
Role for authenticated operations (e.g., "testing") Required for write operations. |
None
|
dispatch_mode
|
DispatchMode
|
How streaming callbacks are dispatched (default: WORKER) |
WORKER
|
Returns:
| Type | Description |
|---|---|
DPMHTTPBackend
|
DPMHTTPBackend instance (use as context manager or call close() when done) |
Example (read-only): with pacsys.dpm() as backend: temp = backend.read("M:OUTTMP")
Example (authenticated writes): auth = KerberosAuth() with pacsys.dpm(auth=auth, role="testing") as backend: print(f"Authenticated as: {backend.principal}") result = backend.write("M:OUTTMP", 72.5)
pacsys.grpc(host=None, port=None, auth=None, timeout=None, dispatch_mode=DispatchMode.WORKER)
¶
Create a gRPC backend instance.
Uses the DAQ gRPC service for reads and writes. Writes require JWT authentication.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
Optional[str]
|
gRPC server hostname (env: PACSYS_GRPC_HOST, default: dce08.fnal.gov) |
None
|
port
|
Optional[int]
|
gRPC server port (env: PACSYS_GRPC_PORT, default: 50051) |
None
|
auth
|
Optional[Auth]
|
Authentication object (JWTAuth for writes). If None, tries PACSYS_JWT_TOKEN env. |
None
|
timeout
|
Optional[float]
|
Default operation timeout in seconds (default: 5.0) |
None
|
Returns:
| Type | Description |
|---|---|
GRPCBackend
|
GRPCBackend instance (use as context manager or call close() when done) |
Raises:
| Type | Description |
|---|---|
ImportError
|
If grpc package is not installed |
Example (read-only): with pacsys.grpc() as backend: temp = backend.read("M:OUTTMP")
Example (with JWT): auth = JWTAuth(token="eyJ...") with pacsys.grpc(auth=auth) as backend: print(f"Authenticated as: {backend.principal}") result = backend.write("M:OUTTMP", 72.5)
Example (token from environment): # export PACSYS_JWT_TOKEN="eyJ..." with pacsys.grpc() as backend: if backend.authenticated: print(f"Authenticated as: {backend.principal}")
pacsys.dmq(host=None, port=None, timeout=None, auth=None, write_session_ttl=None, dispatch_mode=DispatchMode.WORKER)
¶
Create a DMQ backend instance (RabbitMQ/AMQP).
Uses RabbitMQ to communicate with ACNET via the DMQ server. Requires Kerberos authentication for ALL operations (including reads).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
Optional[str]
|
RabbitMQ broker hostname (default: from PACSYS_DMQ_HOST or appsrv2.fnal.gov) |
None
|
port
|
Optional[int]
|
RabbitMQ broker port (default: from PACSYS_DMQ_PORT or 5672) |
None
|
timeout
|
Optional[float]
|
Default operation timeout in seconds (default: 10.0) |
None
|
auth
|
Optional[Auth]
|
KerberosAuth required for all DMQ operations |
None
|
write_session_ttl
|
Optional[float]
|
Idle timeout for write sessions in seconds (default: 600) |
None
|
Returns:
| Type | Description |
|---|---|
DMQBackend
|
DMQBackend instance (use as context manager or call close() when done) |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If auth is not provided or not KerberosAuth |
ImportError
|
If pika or gssapi packages are not installed |
Example
auth = KerberosAuth() with pacsys.dmq(auth=auth) as backend: temp = backend.read("M:OUTTMP") result = backend.write("Z:ACLTST", 45.0)
pacsys.acl(base_url=None, timeout=None)
¶
Create an ACL backend instance (read-only, no streaming, no auth).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
Optional[str]
|
ACL CGI base URL (default: https://www-bd.fnal.gov/cgi-bin/acl.pl) |
None
|
timeout
|
Optional[float]
|
Default operation timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
ACLBackend
|
ACLBackend instance |
Example
with pacsys.acl() as backend: temp = backend.read("M:OUTTMP") reading = backend.get("M:OUTTMP") readings = backend.get_many(["M:OUTTMP", "G:AMANDA"])
pacsys.dpm_http(host=None, port=None, pool_size=None, timeout=None, auth=None, role=None, dispatch_mode=DispatchMode.WORKER)
¶
Create a DPM HTTP backend with independent streaming subscriptions.
This backend uses TCP/HTTP protocol to communicate with DPM. Each subscribe() call creates its own TCP connection, allowing truly independent subscriptions that can be started/stopped individually.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
Optional[str]
|
DPM proxy hostname (default: acsys-proxy.fnal.gov) |
None
|
port
|
Optional[int]
|
DPM proxy port (default: 6802) |
None
|
pool_size
|
Optional[int]
|
Connection pool size for reads (default: 4) |
None
|
timeout
|
Optional[float]
|
Default operation timeout in seconds (default: 5.0) |
None
|
auth
|
Optional[Auth]
|
Authentication object (KerberosAuth for writes) |
None
|
role
|
Optional[str]
|
Role for authenticated operations (e.g., "testing") |
None
|
Returns:
| Type | Description |
|---|---|
DPMHTTPBackend
|
DPMHTTPBackend instance |
Example (multiple independent subscriptions): with pacsys.dpm_http() as backend: sub1 = backend.subscribe(["M:OUTTMP@p,1000"]) sub2 = backend.subscribe(["G:AMANDA@p,500"])
# Stopping sub1 doesn't affect sub2
sub1.stop()
for reading, _ in sub2.readings(timeout=10):
print(f"{reading.name}: {reading.value}")
pacsys.devdb
¶
DevDB gRPC client for querying device metadata from the master PostgreSQL database.
DevDB is a metadata service (not a data acquisition backend). It provides device information like scaling parameters, control commands, and status bit definitions.
Usage
import pacsys
with pacsys.devdb() as db: info = db.get_device_info(["Z:ACLTST", "M:OUTTMP"]) print(info["Z:ACLTST"].description)
AlarmBlockInfo
dataclass
¶
Alarm block information from DevDB.
AlarmInfo
dataclass
¶
Alarm information for a device from DevDB.
AlarmText
dataclass
¶
Alarm text entry from DevDB.
ControlCommandDef
dataclass
¶
A control command definition from DevDB.
DevDBClient
¶
Synchronous gRPC client for DevDB device metadata queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str | None
|
DevDB gRPC server hostname (default: from PACSYS_DEVDB_HOST or ad-services.fnal.gov/services.devdb) |
None
|
port
|
int | None
|
DevDB gRPC server port (default: from PACSYS_DEVDB_PORT or 6802) |
None
|
timeout
|
float | None
|
RPC timeout in seconds (default: 5.0) |
None
|
cache_ttl
|
float
|
TTL for cached results in seconds (default: 3600.0) |
3600.0
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If grpc package is not available |
clear_cache(device=None)
¶
Clear cached results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
str | None
|
If given, clear only this device's cache. Otherwise clear all. |
None
|
close()
¶
Close the gRPC channel.
get_alarm_info(names)
¶
Query alarm information for devices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
names
|
list[str]
|
Device names |
required |
Returns:
| Type | Description |
|---|---|
list[AlarmInfo]
|
List of AlarmInfo entries. |
Raises:
| Type | Description |
|---|---|
RpcError
|
On gRPC transport failure. |
get_alarm_text(ids)
¶
Query alarm text entries by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[int]
|
Alarm text IDs |
required |
Returns:
| Type | Description |
|---|---|
list[AlarmText]
|
List of AlarmText entries. |
Raises:
| Type | Description |
|---|---|
RpcError
|
On gRPC transport failure. |
get_device_info(names, timeout=None)
¶
Query device metadata for one or more devices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
names
|
list[str]
|
Device names (e.g., ["Z:ACLTST", "M:OUTTMP"]) |
required |
timeout
|
float | None
|
gRPC timeout in seconds (default: client's configured timeout). |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, DeviceInfoResult]
|
Dict mapping device name to DeviceInfoResult. |
Raises:
| Type | Description |
|---|---|
DeviceError
|
If DevDB returns an error for a device. |
RpcError
|
On gRPC transport failure. |
DeviceInfoResult
dataclass
¶
Complete device metadata from DevDB.
ExtStatusBitDef
dataclass
¶
Extended status bit definition from DevDB.
PropertyInfo
dataclass
¶
Scaling and metadata for a device property (reading or setting).
StatusBitDef
dataclass
¶
One status bit definition from DevDB (not a runtime value).
Uses mask/match/invert for proper bit evaluation: is_active = ((raw_value & mask) == match) ^ invert
pacsys.supervised
¶
Supervised mode: gRPC proxy server with logging and policy enforcement.
Wraps any Backend instance and exposes it as a gRPC DAQ service, forwarding requests while enforcing access policies and logging all traffic.
Example
from pacsys.testing import FakeBackend from pacsys.supervised import SupervisedServer, ReadOnlyPolicy
fb = FakeBackend() fb.set_reading("M:OUTTMP", 72.5)
with SupervisedServer(fb, port=50099, policies=[ReadOnlyPolicy()]) as srv: import pacsys with pacsys.grpc(host="localhost", port=50099) as client: print(client.read("M:OUTTMP"))
AuditLog
¶
Structured audit log with optional raw protobuf capture.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
JSON lines file path. |
required |
proto_path
|
Optional[str]
|
Binary protobuf file path (optional). |
None
|
log_responses
|
bool
|
Log outgoing responses too (default: False). |
False
|
flush_interval
|
int
|
Flush files every N writes (default: 1). |
1
|
DeviceAccessPolicy
¶
Bases: Policy
Allow or deny access based on device name patterns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
patterns
|
list[str]
|
List of patterns (e.g. ["M:*", "G:AMANDA"]) |
required |
mode
|
str
|
"allow" = approve matching devices, "deny" = block matching devices |
'allow'
|
action
|
str
|
"all" (both RPCs), "read" (Read only), "set" (Set only) |
'all'
|
syntax
|
str
|
"glob" (fnmatch, default) or "regex" (full-match, case-insensitive) |
'glob'
|
Policy
¶
Bases: ABC
Abstract base for policy checks. Implement check() to allow or deny requests.
allows_writes
property
¶
Whether this policy explicitly gates write access.
PolicyDecision
dataclass
¶
Result of a policy check.
On deny: reason is required, ctx is ignored.
On allow without modification: ctx is None.
On allow with modification: ctx is a new RequestContext.
RateLimitPolicy
¶
Bases: Policy
Sliding window rate limit per peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_requests
|
int
|
Maximum requests per window |
required |
window_seconds
|
float
|
Window size in seconds (default: 60) |
60.0
|
RequestContext
dataclass
¶
Context for a single RPC request, passed to policy checks.
SlewLimit
dataclass
¶
Constraints for a single device pattern in :class:SlewRatePolicy.
At least one of max_step or max_rate must be set.
Attributes:
| Name | Type | Description |
|---|---|---|
max_step |
Optional[float]
|
Maximum absolute change per write (units). |
max_rate |
Optional[float]
|
Maximum rate of change (units/second). |
SlewRatePolicy
¶
Bases: Policy
Deny writes that change too fast or by too much.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limits
|
dict[str, SlewLimit]
|
Mapping of device name glob pattern to :class: |
required |
First write to any device is always allowed (no history). History is updated on allow (accepts that failed backend writes will leave stale history).
SupervisedServer
¶
gRPC proxy server with logging and policy enforcement.
Wraps any Backend and exposes the DAQ gRPC service, forwarding requests while enforcing policies and logging all traffic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
Backend | AsyncBackend
|
Backend instance to proxy requests to |
required |
port
|
int
|
Port to listen on (default: 50051) |
50051
|
host
|
str
|
Host to bind (default: "[::] " for all interfaces) |
'[::]'
|
policies
|
Optional[list[Policy]]
|
Optional list of Policy instances for access control |
None
|
token
|
Optional[str]
|
Optional bearer token for write authentication.
When set, clients must send |
None
|
Example
from pacsys.testing import FakeBackend from pacsys.supervised import SupervisedServer, ReadOnlyPolicy
fb = FakeBackend() fb.set_reading("M:OUTTMP", 72.5)
with SupervisedServer(fb, port=50099, policies=[ReadOnlyPolicy()]) as srv: # Clients can now connect to localhost:50099 srv.wait()
ValueRangePolicy
¶
Bases: Policy
Deny writes where numeric values fall outside allowed ranges.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limits
|
dict[str, tuple[float, float]]
|
Mapping of device name glob pattern to (min, max) bounds. |
required |
evaluate_policies(policies, ctx)
¶
Evaluate a chain of policies. First denial short-circuits.
Each policy sees the (potentially modified) context from the previous
policy. The final decision always carries ctx set to the final context.
Types¶
Reading¶
pacsys.types.Reading
dataclass
¶
A device reading with status and optional data.
Status semantics (matches gRPC Status message): - facility_code: ACNET facility identifier (0=success, 1=ACNET, 16=DBM, 17=DPM) - error_code: 0=success, >0=warning, <0=error
value = None
class-attribute
instance-attribute
¶
value_type = None
class-attribute
instance-attribute
¶
name
property
¶
Device name extracted from DRF or metadata.
drf
instance-attribute
¶
units
property
¶
Engineering units from metadata, or None if unavailable.
timestamp = None
class-attribute
instance-attribute
¶
cycle = None
class-attribute
instance-attribute
¶
facility_code = 0
class-attribute
instance-attribute
¶
error_code = 0
class-attribute
instance-attribute
¶
message = None
class-attribute
instance-attribute
¶
is_success
property
¶
True if status indicates success (error_code == 0).
is_error
property
¶
True if status indicates error (error_code < 0).
is_warning
property
¶
True if status indicates warning (error_code > 0).
ok
property
¶
True if data is usable (success or warning with data).
WriteResult¶
pacsys.types.WriteResult
dataclass
¶
Result of a write operation, optionally with verification info.
SubscriptionHandle¶
pacsys.types.SubscriptionHandle
¶
Handle for a streaming subscription.
Provides access to subscription state and allows stopping the subscription. Each handle has its own queue for readings. Use readings() to iterate over readings from THIS subscription only.
Usage
Context manager (recommended) - auto-stops on exit¶
with backend.subscribe(["M:OUTTMP@p,1000"]) as sub: for reading, handle in sub.readings(timeout=10): print(reading.value) if reading.value > 10: sub.stop()
Manual control¶
sub = backend.subscribe(["M:OUTTMP@p,1000"]) for reading, handle in sub.readings(timeout=10): print(reading.value) sub.stop()
stopped
property
¶
True if this subscription has been stopped.
readings(timeout=None)
¶
Yield (reading, handle) pairs for THIS subscription.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
Optional[float]
|
Seconds to wait for next reading. None = block forever (until stop() called) 0 = non-blocking (drain buffered readings only) |
None
|
Yields:
| Type | Description |
|---|---|
tuple[Reading, SubscriptionHandle]
|
(reading, handle) pairs |
stop()
¶
Stop this subscription.
ValueType¶
pacsys.types.ValueType
¶
Bases: Enum
Type of value returned from a device read.
Authentication¶
KerberosAuth¶
pacsys.auth.KerberosAuth
dataclass
¶
Bases: Auth
Kerberos authentication using system credential cache.
Requires
- Valid Kerberos ticket (run
kinitfirst) - gssapi library (
pip install gssapi)
The credentials are obtained from the system credential cache, so the same KerberosAuth instance can be used across multiple backends and services.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
Optional[str]
|
Kerberos principal to use (e.g., "user2@FNAL.GOV"). If None, uses the default principal from the credential cache. |
None
|
Example
Default principal¶
auth = KerberosAuth()
Specific principal (e.g., multiple tickets in cache)¶
auth = KerberosAuth(name="operator@FNAL.GOV")
Note
Credentials are validated at construction time (fail fast).
JWTAuth¶
pacsys.auth.JWTAuth
dataclass
¶
Bases: Auth
JWT token authentication.
Can be used with gRPC backend and other JWT-accepting services. Token is stored as-is; validation is done server-side.
Example
Explicit token¶
auth = JWTAuth(token="eyJ...")
From environment variable¶
auth = JWTAuth.from_env() # reads PACSYS_JWT_TOKEN
Use with gRPC backend¶
backend = GRPCBackend(auth=auth)
Note
Token is excluded from repr to prevent credential leaks in logs.
principal
property
¶
Extract subject claim from JWT token.
Note: This decodes without verification. Server validates the token.
Returns:
| Type | Description |
|---|---|
str
|
Subject ('sub') claim from token |
Raises:
| Type | Description |
|---|---|
ValueError
|
If token format is invalid |
from_env(var='PACSYS_JWT_TOKEN')
classmethod
¶
Create JWTAuth from environment variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
var
|
str
|
Environment variable name (default: PACSYS_JWT_TOKEN) |
'PACSYS_JWT_TOKEN'
|
Returns:
| Type | Description |
|---|---|
Optional[JWTAuth]
|
JWTAuth if environment variable is set, None otherwise |
Errors¶
DeviceError¶
pacsys.errors.DeviceError
¶
Bases: Exception
Raised when a device read fails.
Attributes:
| Name | Type | Description |
|---|---|---|
drf |
Device request string that failed |
|
facility_code |
ACNET facility identifier (1=ACNET, 16=DBM, 17=DPM) |
|
error_code |
Error code (negative indicates error, positive is warning) |
|
message |
Human-readable error description |
AuthenticationError¶
pacsys.errors.AuthenticationError
¶
Bases: Exception
Raised when authentication fails or is required.
This exception is raised when: - A write is attempted without authentication - Kerberos ticket is expired or missing - JWT token is invalid or expired
ReadError¶
pacsys.errors.ReadError
¶
Bases: Exception
Batch read failure.
Raised in two scenarios:
- get_many(): transport error (timeout, connection drop, auth failure).
The underlying network exception is chained via __cause__.
- read_many(): any device in the batch returned an unusable reading
(ACNET error or missing value).
Attributes:
| Name | Type | Description |
|---|---|---|
readings |
Complete results list (same length as input drfs). Includes both successful readings and error-backfilled entries. |
|
message |
Human-readable description of the failure. |
ACLError¶
pacsys.errors.ACLError
¶
Bases: Exception
Raised when an ACL command fails.
This exception is raised when: - A one-shot ACL command exits with non-zero status - An ACL session receives an error response - The ACL prompt times out - The ACL process exits unexpectedly
Device Classes¶
Object-oriented interface for device access.
Device¶
pacsys.device.Device
¶
Bases: _DeviceBase
Device wrapper with DRF3 validation at construction.
Devices are immutable - modification methods return NEW Device instances.
read(*, field=None, timeout=None)
¶
Read READING property. Raises DeviceError on failure.
get(*, prop=None, field=None, timeout=None)
¶
Read device with full metadata (timestamp, cycle, meta).
setting(*, field=None, timeout=None)
¶
Read SETTING property.
status(*, field=None, timeout=None)
¶
Read STATUS property.
analog_alarm(*, field=None, timeout=None)
¶
Read ANALOG alarm property.
digital_alarm(*, field=None, timeout=None)
¶
Read DIGITAL alarm property.
description(*, field=None, timeout=None)
¶
Read DESCRIPTION property.
digital_status(timeout=None)
¶
Fetch full digital status (BIT_VALUE + BIT_NAMES + BIT_VALUES).
info(timeout=None)
¶
Fetch device metadata from DevDB (cached).
write(value, *, field=None, verify=None, timeout=None)
¶
Write to SETTING property.
control(command, *, verify=None, timeout=None)
¶
Write CONTROL command.
on(*, verify=None, timeout=None)
¶
off(*, verify=None, timeout=None)
¶
reset(*, verify=None, timeout=None)
¶
set_analog_alarm(settings, *, timeout=None)
¶
Write ANALOG alarm property.
set_digital_alarm(settings, *, timeout=None)
¶
Write DIGITAL alarm property.
subscribe(callback=None, on_error=None, *, prop=None, field=None, event=None)
¶
Subscribe to device for streaming data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prop
|
str | None
|
Property to subscribe to (default: READING) |
None
|
field
|
str | None
|
Sub-field (requires prop) |
None
|
event
|
str | None
|
Event string (e.g. "p,1000"). Uses device's event if None. |
None
|
callback
|
ReadingCallback | None
|
Optional callback for push-mode |
None
|
on_error
|
ErrorCallback | None
|
Optional error handler |
None
|
Returns:
| Type | Description |
|---|---|
SubscriptionHandle
|
SubscriptionHandle (usable as context manager) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no event available, or field given without prop |
with_backend(backend)
¶
Return new Device bound to a specific backend.
ScalarDevice¶
pacsys.device.ScalarDevice
¶
ArrayDevice¶
pacsys.device.ArrayDevice
¶
TextDevice¶
pacsys.device.TextDevice
¶
SSH Utility¶
SSHClient¶
pacsys.ssh.SSHClient
¶
SSH client supporting multi-hop connections, command execution, tunneling, and SFTP.
Connection is lazy - established on first operation. Uses paramiko Transport with GSSAPI (Kerberos), key, or password authentication per hop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hops
|
Union[HopSpec, list[HopSpec]]
|
Target host(s). Accepts a hostname string, SSHHop, or list of either. Multiple hops create a chain (jump hosts). |
required |
auth
|
Optional[object]
|
Optional KerberosAuth for GSSAPI hops. If None and any hop uses gssapi auth, credentials are validated at init (fail fast). |
None
|
connect_timeout
|
float
|
TCP connection timeout in seconds (default 10.0). |
10.0
|
Example
with SSHClient("target.fnal.gov") as ssh: result = ssh.exec("hostname") print(result.stdout)
exec(command, timeout=None, input=None)
¶
Execute a command on the remote host.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str
|
Shell command to execute |
required |
timeout
|
Optional[float]
|
Command timeout in seconds (None = no timeout) |
None
|
input
|
Optional[str]
|
Optional stdin data to send |
None
|
Returns:
| Type | Description |
|---|---|
CommandResult
|
CommandResult with exit_code, stdout, stderr |
Raises:
| Type | Description |
|---|---|
SSHTimeoutError
|
If timeout is exceeded |
SSHConnectionError
|
If transport is not active |
exec_stream(command, timeout=None)
¶
Execute a command and yield stdout lines as they arrive.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str
|
Shell command to execute |
required |
timeout
|
Optional[float]
|
Command timeout in seconds |
None
|
Yields:
| Type | Description |
|---|---|
str
|
Lines of stdout output |
Raises:
| Type | Description |
|---|---|
SSHCommandError
|
If command exits with non-zero status (after all output) |
SSHTimeoutError
|
If timeout is exceeded |
exec_many(commands, timeout=None)
¶
Execute multiple commands sequentially.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
commands
|
list[str]
|
List of shell commands |
required |
timeout
|
Optional[float]
|
Per-command timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
list[CommandResult]
|
List of CommandResult in same order as input |
forward(local_port, remote_host, remote_port)
¶
Create a local port forward through the SSH connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_port
|
int
|
Local port to listen on (0 for OS-assigned) |
required |
remote_host
|
str
|
Remote host to forward to |
required |
remote_port
|
int
|
Remote port to forward to |
required |
Returns:
| Type | Description |
|---|---|
Tunnel
|
Tunnel object (use as context manager or call stop()) |
sftp()
¶
Open an SFTP session on the remote host.
Returns:
| Type | Description |
|---|---|
SFTPSession
|
SFTPSession (use as context manager or call close()) |
open_channel(command, timeout=None)
¶
Open an SSH channel executing a command, with stdin kept open for interactive use.
Unlike exec(), the channel's stdin is NOT closed after opening, allowing interactive input. The caller is responsible for closing the channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str
|
Command to execute on the remote host |
required |
timeout
|
float | None
|
Channel timeout in seconds (None = no timeout) |
None
|
Returns:
| Type | Description |
|---|---|
Channel
|
paramiko.Channel with the command running and stdin open |
remote_process(command, *, timeout=30.0)
¶
Open a persistent interactive process over SSH.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str
|
Command to execute on the remote host |
required |
timeout
|
float
|
Default timeout for read operations in seconds |
30.0
|
Returns:
| Type | Description |
|---|---|
RemoteProcess
|
RemoteProcess (use as context manager or call close()) |
acl_session(*, timeout=30.0)
¶
Open a persistent ACL interpreter session.
The session keeps an acl process alive over an SSH channel,
avoiding process startup overhead. Each send() is a separate
script execution - state does NOT persist between calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Default timeout for prompt detection in seconds |
30.0
|
Returns:
| Type | Description |
|---|---|
ACLSession
|
ACLSession (use as context manager or call close()) |
acl(command, timeout=None)
¶
Execute ACL command(s) and return output text.
Commands are written to a temp script file on the remote host and
executed as acl /tmp/pacsys_acl_XXXX.acl.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str | list[str]
|
ACL command string, or list of commands (semicolons in a string are treated as one line). |
required |
timeout
|
float | None
|
Command timeout in seconds (default 30.0) |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Command output with ACL prompts stripped |
Raises:
| Type | Description |
|---|---|
ACLError
|
If the ACL process exits with non-zero status |
ValueError
|
If command list is empty |
close()
¶
Close the SSH connection and all tunnels.
RemoteProcess¶
pacsys.ssh.RemoteProcess
¶
Persistent interactive process over SSH. Dumb bidirectional pipe.
Not thread-safe. Context manager. Does NOT own SSHClient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ssh
|
SSHClient
|
Connected SSHClient instance |
required |
command
|
str
|
Command to execute on the remote host |
required |
timeout
|
float
|
Default timeout for read operations in seconds |
30.0
|
alive
property
¶
Process still running.
send_line(line)
¶
Send line + newline.
send_bytes(data)
¶
Send raw bytes.
read_until(marker, timeout=None)
¶
Read until marker found in stream, return bytes before marker.
Marker is consumed from buffer.
Raises:
| Type | Description |
|---|---|
SSHTimeoutError
|
If timeout expires before marker found |
SSHError
|
If channel closes before marker found |
read_for(seconds)
¶
Read all data arriving within timeout. Returns on idle.
close()
¶
Close channel (idempotent).
SSHHop¶
pacsys.ssh.SSHHop
dataclass
¶
Configuration for a single SSH hop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hostname
|
str
|
SSH server hostname (required, non-empty) |
required |
port
|
int
|
SSH port (default 22) |
22
|
username
|
Optional[str]
|
SSH username (default: current OS user) |
None
|
auth_method
|
str
|
"gssapi", "key", or "password" |
'gssapi'
|
key_filename
|
Optional[str]
|
Path to private key (required when auth_method="key") |
None
|
password
|
Optional[str]
|
Password (required when auth_method="password", excluded from repr) |
None
|
CommandResult¶
pacsys.ssh.CommandResult
dataclass
¶
Result of a remote command execution.
ACLSession¶
pacsys.acl_session.ACLSession
¶
Persistent ACL interpreter session over SSH.
Opens an acl process on a remote host via SSH and keeps it alive,
avoiding process startup overhead. Each send() is a separate script
execution - state (variables, symbols) does NOT persist between calls.
Combine dependent commands with semicolons in a single send().
Multiple sessions can coexist on the same SSHClient (paramiko multiplexes channels on a single transport).
Not thread-safe - do not share a single session across threads. Use separate sessions per thread instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ssh
|
SSHClient
|
Connected SSHClient instance |
required |
timeout
|
float
|
Default timeout for prompt detection in seconds |
30.0
|
Usage
with ssh.acl_session() as acl: acl.send("read M:OUTTMP")
Or explicitly:¶
acl = ACLSession(ssh_client) acl.send("read M:OUTTMP") acl.close()
send(command, timeout=None)
¶
Send a command to the ACL interpreter and return the output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command
|
str
|
ACL command string |
required |
timeout
|
float | None
|
Override default timeout for this command |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Command output with prompts and echoed command stripped |
Raises:
| Type | Description |
|---|---|
ACLError
|
If the session is closed, the process exits, or prompt times out |
close()
¶
Close the ACL session (closes the SSH channel, not the SSHClient).
SSH Exceptions¶
pacsys.ssh.SSHError
¶
Bases: Exception
Base exception for SSH operations.