Skip to content

DMQ

RabbitMQ-based backend that communicates with ACNET via the DMQ server (DataBroker/Bunny OAC/DAE). Uses AMQP protocol with SDD (Self-Describing Data) binary encoding.

sequenceDiagram
    participant App as Your App
    participant RMQ as RabbitMQ<br>:5672
    participant DMQ as DMQ Server
    participant ACNET as ACNET

    App->>RMQ: AMQP connect
    App->>RMQ: Declare queue + exchange
    App->>RMQ: INIT (GSS token + MIC)
    RMQ->>DMQ: Forward request
    DMQ->>ACNET: Device query

    loop Streaming
        ACNET-->>DMQ: Device data
        DMQ-->>RMQ: Sample replies
        RMQ-->>App: Consume messages
    end

    App->>RMQ: DROP

Characteristics

  • Kerberos required: All operations (reads, writes, streaming) require KerberosAuth
  • GSS-API signing: Messages are signed with MIC for authentication
  • Shared streaming connection: All subscriptions share a single AMQP connection via SelectConnection with multiple channels
  • Connection caching: Write connections are cached per device for performance
  • Heartbeats: Client sends heartbeats every 5 seconds to maintain connections

Usage

import pacsys
from pacsys import KerberosAuth

# All DMQ operations require Kerberos auth
auth = KerberosAuth()

# Read
with pacsys.dmq(auth=auth) as backend:
    value = backend.read("M:OUTTMP")

# Write
with pacsys.dmq(auth=auth) as backend:
    result = backend.write("M:OUTTMP", 72.5)

# Stream
with pacsys.dmq(auth=auth) as backend:
    with backend.subscribe(["M:OUTTMP@p,1000"]) as stream:
        for reading, _ in stream.readings(timeout=30):
            print(reading.value)

Configuration

Parameter Default Environment Variable
host appsrv2.fnal.gov PACSYS_DMQ_HOST
port 5672 PACSYS_DMQ_PORT
vhost / -
timeout 10.0 -

For the Curious: How DMQ Writes Work

A DMQ write involves a 4-step AMQP dance: channel setup (private exchange + queue), INIT handshake (GSS-signed SettingRequest to amq.topic), value send (routing key must exactly match INIT DRF string), and response consumption. Sessions are cached per device; subsequent writes skip steps 1-2.

Full protocol details

1. Channel Setup

Each write session gets its own AMQP channel with a private "mailbox":

Your client creates:
  - A topic exchange (random UUID name, e.g. "a1b2c3d4-...")
  - A queue bound to that exchange for R.# and Q routing keys

The exchange name is your "return address" - the DMQ server publishes
responses there so only you receive them.

2. The INIT Handshake

Before sending any data, your client must tell the DMQ server what it wants to do:

Client → amq.topic (routing key "I"):
  Body:    SettingRequest { dataRequest: ["Z:ACLTST.SETTING@N"] }
  Headers: gss-token (Kerberos authentication)
           signature (MIC of body + message properties)
           host-address (your IP)
  Props:   reply_to = "a1b2c3d4-..."  ← your exchange name

The DMQ server processes this INIT and registers a "setter" keyed by the exact string "Z:ACLTST.SETTING@N". This string matching is critical later.

3. Sending the Value

After a brief delay for INIT processing, the client sends the actual value:

Client → your exchange (routing key "S.Z:ACLTST.SETTING@N"):
  Body:    DoubleSample { value: 45.0, time: <now> }
  Headers: signature (MIC of body + messageId + correlationId + hostAddress)
           host-address (your IP)
  Props:   message_id, correlation_id ← for matching the response

The routing key must be S. followed by the exact DRF string used in the INIT. The server extracts everything after S. and looks it up in its setter registry. A mismatch means "invalid request".

4. The Response

The DMQ server sends back one R-keyed response on your exchange:

R.Z:ACLTST.SETTING@N  →  DoubleSample { value: 45.0 }    ← actual result
                     (or)  ErrorSample { error: -98 }      ← if something went wrong

The R-keyed reply is the authoritative result from ACNET. Note that the DMQ server sends correlationId="" (empty) on write responses, so the client uses FIFO ordering to match responses to pending writes.

What Can Go Wrong

Error Code Cause
DMQ_INVALID_REQUEST -98 Routing key doesn't match INIT dataRequest (exact string match!)
DMQ_SECURITY_VIOLATION -99 MIC signature verification failed (wrong sign format)
DMQ_PENDING 1 Not an error - INIT still processing, wait for final status

For writes, the server sends PENDING only after full job creation (InitTask.run), which includes ACNET backend setup. This can take up to 5s (CLIENT_INIT_RATE). For reads, PENDING is sent immediately.

The signing format must match Java's GSSUtil.createBody: the MIC covers not just the binary body but also messageId, correlationId, replyTo, appId, and hostAddress, separated by null bytes. Getting this wrong is a silent authentication failure.

Session Reuse

Creating a write session (channel + exchange + INIT) takes time, so pacsys caches sessions per device and maintains a pool of pre-warmed "standby" channels. Subsequent writes to the same device reuse the existing session, skipping steps 1-2 entirely. Sessions are cleaned up after 10 minutes of inactivity.

When to Use

  • When you have Kerberos credentials and need full read/write/stream
  • As an alternative to DPM/HTTP when talking to ACNET via RabbitMQ infrastructure
  • When the DMQ server is the preferred access point for your environment
References