"""Layer 1 (Read) — direct DCGM backend (``dcgmi dmon``).
Streams hardware counters from a local ``dcgmi dmon`` subprocess and emits one
``RawRecord`` per GPU per sampling tick, keyed by DCGM field names (the source's
own vocabulary). An ``N/A`` reading becomes ``None``; it is never coerced to
``0``. Assigning meaning to these fields (canonical names, units, scaling) is
Layer 2's job, not this module's.
Entry points:
* ``DcgmiBackend`` — a persistent ``dcgmi dmon -c 0`` stream for live monitoring.
* ``read_once`` — a single synchronous ``dcgmi dmon -c 2`` collection, for
one-shot queries where a long-lived stream is unnecessary.
* ``parse_dmon_block`` — pure text -> ``RawRecord`` parser (the testable core).
* ``resolve_dcgm_gpu_ids`` — map process-visible GPUs to physical dcgmi IDs.
"""
from __future__ import annotations
import math
import os
import re
import subprocess
import time
from typing import Dict, Iterator, List, Optional, Tuple
from .base import (
Backend,
BackendCaps,
BackendKind,
DcgmStreamError,
RawRecord,
ReaderConfig,
)
# DCGM field id -> field name. Order is significant: it fixes the column
# positions in ``dcgmi dmon`` output, which the parser reads positionally.
DCGM_DMON_FIELDS: Tuple[Tuple[int, str], ...] = (
# Device-level metrics
(100, "DCGM_FI_DEV_SM_CLOCK"), # MHz
(101, "DCGM_FI_DEV_MEM_CLOCK"), # MHz
(140, "DCGM_FI_DEV_MEMORY_TEMP"), # Celsius
(150, "DCGM_FI_DEV_GPU_TEMP"), # Celsius
(155, "DCGM_FI_DEV_POWER_USAGE"), # Watts
(156, "DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"), # millijoules (counter)
(202, "DCGM_FI_DEV_PCIE_REPLAY_COUNTER"), # counter
(203, "DCGM_FI_DEV_GPU_UTIL"), # 0-100%
(204, "DCGM_FI_DEV_MEM_COPY_UTIL"), # 0-100%
(251, "DCGM_FI_DEV_FB_FREE"), # MiB
(252, "DCGM_FI_DEV_FB_USED"), # MiB
(253, "DCGM_FI_DEV_FB_RESERVED"), # MiB
(449, "DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL"), # MB/s (gauge)
# Profiling metrics (ratio 0-1)
(1001, "DCGM_FI_PROF_GR_ENGINE_ACTIVE"),
(1002, "DCGM_FI_PROF_SM_ACTIVE"),
(1003, "DCGM_FI_PROF_SM_OCCUPANCY"),
(1004, "DCGM_FI_PROF_PIPE_TENSOR_ACTIVE"),
(1005, "DCGM_FI_PROF_DRAM_ACTIVE"),
(1006, "DCGM_FI_PROF_PIPE_FP64_ACTIVE"),
(1007, "DCGM_FI_PROF_PIPE_FP32_ACTIVE"),
(1008, "DCGM_FI_PROF_PIPE_FP16_ACTIVE"),
(1009, "DCGM_FI_PROF_PCIE_TX_BYTES"), # bytes/sec
(1010, "DCGM_FI_PROF_PCIE_RX_BYTES"), # bytes/sec
)
DCGM_DMON_FIELD_IDS = ",".join(str(fid) for fid, _ in DCGM_DMON_FIELDS)
DCGM_DMON_METRIC_NAMES = [name for _, name in DCGM_DMON_FIELDS]
# DCGM profiling counters (DCGM_FI_PROF_*) refresh at ~10 Hz through the shared
# hardware-counter multiplexer. Below ~100 ms, ``dcgmi dmon`` returns mostly
# N/A profiling rows, so the streaming interval is clamped to this floor.
DCGM_STREAM_MIN_INTERVAL_MS = 100
[docs]
def resolve_dcgm_gpu_ids(discovery_stdout: str) -> Tuple[List[str], Dict[str, str]]:
"""Resolve the physical GPU IDs visible to this process via dcgmi discovery.
Inside a SLURM cgroup, ``CUDA_VISIBLE_DEVICES`` is remapped to ``0``, but
``dcgmi`` operates outside the cgroup and uses physical GPU indices. The two
are reconciled by matching on GPU UUID.
Args:
discovery_stdout: stdout from ``dcgmi discovery -l``.
Returns:
``(physical_ids, physical_to_local_map)``. ``physical_to_local_map``
maps each physical GPU ID to its local (cgroup) GPU ID, so that exports
can match dcgmi GPU IDs against ``nvidia-smi`` process IDs.
"""
try:
nvsmi = subprocess.run(
["nvidia-smi", "--query-gpu=index,uuid", "--format=csv,noheader"],
capture_output=True, text=True, timeout=5,
)
if nvsmi.returncode != 0:
return [], {}
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return [], {}
local_uuids: Dict[str, str] = {} # uuid -> local_index
for line in nvsmi.stdout.strip().splitlines():
parts = [p.strip() for p in line.split(",", 1)]
if len(parts) == 2:
local_uuids[parts[1]] = parts[0]
if not local_uuids:
return [], {}
# Parse dcgmi discovery output to map UUID -> physical GPU ID.
# Format: "| 3 | Name: ... \n| | Device UUID: GPU-xxxx..."
physical_ids: List[str] = []
phys_to_local: Dict[str, str] = {}
current_gpu_id: Optional[str] = None
for line in discovery_stdout.splitlines():
m_id = re.search(r"^\|\s*(\d+)\s*\|", line)
if m_id:
current_gpu_id = m_id.group(1)
m_uuid = re.search(r"UUID:\s*(GPU-[0-9a-fA-F-]+)", line)
if m_uuid and current_gpu_id is not None:
if m_uuid.group(1) in local_uuids:
physical_ids.append(current_gpu_id)
phys_to_local[current_gpu_id] = local_uuids[m_uuid.group(1)]
current_gpu_id = None
if not physical_ids:
ids = list(local_uuids.values())
return ids, {i: i for i in ids}
return physical_ids, phys_to_local
def _parse_value(token: str) -> Optional[float]:
"""Deserialize one dcgmi token to a float, or ``None`` if not a usable value.
``"N/A"``, non-numeric tokens, and non-finite values all map to ``None`` — a
reading the source could not provide, never silently coerced to ``0``.
"""
if token == "N/A":
return None
try:
value = float(token)
except ValueError:
return None
if math.isinf(value) or math.isnan(value):
return None
return value
[docs]
def parse_dmon_block(
text: str,
*,
source_version: str = "dcgmi",
timestamp: Optional[float] = None,
wallclock: Optional[float] = None,
) -> List[RawRecord]:
"""Parse ``dcgmi dmon`` rows into ``RawRecord`` objects, one per ``GPU <id>`` row.
Each record's ``fields`` map DCGM field names to raw values (``None`` for
``N/A``). Header (``#``), ``ID``, and non-data lines are skipped. Every
record produced from one call shares the same ``timestamp``/``wallclock``
(they belong to one sampling tick).
Layout (columns follow ``DCGM_DMON_FIELDS`` order)::
#Entity GPUTL POWER GTEMP MTEMP ...
ID
GPU 0 72 155.3 65 58 ...
"""
ts = time.monotonic() if timestamp is None else timestamp
wc = time.time() if wallclock is None else wallclock
records: List[RawRecord] = []
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#") or line.startswith("ID"):
continue
parts = line.split()
if len(parts) < 2 or parts[0] != "GPU":
continue
gpu_id = parts[1]
fields: Dict[str, Optional[float]] = {}
for col_idx, metric_name in enumerate(DCGM_DMON_METRIC_NAMES):
val_idx = col_idx + 2 # skip "GPU" and the id
if val_idx >= len(parts):
break
fields[metric_name] = _parse_value(parts[val_idx])
records.append(RawRecord(
timestamp=ts,
wallclock=wc,
entity_id=gpu_id,
fields=fields,
source="dcgmi",
source_version=source_version,
))
return records
[docs]
def run_dmon_once(
gpu_ids: Optional[List[str]] = None,
interval_ms: int = 100,
timeout: float = 15.0,
) -> str:
"""Run one ``dcgmi dmon -c 2`` collection and return its raw stdout text.
Two samples are requested because profiling fields (1001-1010) return
``N/A`` on the first sample of a cold invocation; the valid second tick lets
a downstream last-non-``None``-wins merge recover the real values.
"""
cmd = ["dcgmi", "dmon", "-c", "2", "-d", str(interval_ms),
"-e", DCGM_DMON_FIELD_IDS]
if gpu_ids:
cmd.extend(["-i", ",".join(f"gpu:{gid}" for gid in gpu_ids)])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0:
raise DcgmStreamError(
f"dcgmi dmon failed (exit {result.returncode}): {result.stderr.strip()}",
remediation="Check that the DCGM host engine is running and that "
"profiling is permitted for this user.",
)
return result.stdout
[docs]
def read_once(
gpu_ids: Optional[List[str]] = None,
interval_ms: int = 100,
timeout: float = 15.0,
) -> List[RawRecord]:
"""Collect a single tick from ``dcgmi dmon`` as ``RawRecord`` objects.
Returns the records from both requested samples in order; the caller's
last-non-``None``-wins merge keeps the valid second-tick values.
"""
return parse_dmon_block(run_dmon_once(gpu_ids, interval_ms, timeout))
[docs]
class DcgmiBackend:
"""Persistent ``dcgmi dmon -c 0`` stream emitting ``RawRecord`` objects.
The reader is thread-free: ``stream_ticks`` is a blocking generator that
iterates the subprocess stdout, groups rows into ticks, and yields one list
of records per tick. ``stream`` flattens that to the per-record ``Backend``
contract. The first tick is dropped (profiling fields are ``N/A`` on a cold
start). Concurrency, if needed, is the caller's responsibility.
"""
[docs]
def __init__(self) -> None:
self._proc: Optional[subprocess.Popen] = None
self._gpu_ids: Optional[List[str]] = None
self._poll_ms: int = DCGM_STREAM_MIN_INTERVAL_MS
self._source_version: str = "dcgmi"
self._closed: bool = False
# ── lifecycle ────────────────────────────────────────────────────────
[docs]
def open(self, config: ReaderConfig) -> None:
self._gpu_ids = list(config.gpu_ids) if config.gpu_ids else None
self._poll_ms = max(
DCGM_STREAM_MIN_INTERVAL_MS, int(round(config.poll_seconds * 1000))
)
self._closed = False
cmd = ["dcgmi", "dmon", "-c", "0", "-d", str(self._poll_ms),
"-e", DCGM_DMON_FIELD_IDS]
if self._gpu_ids:
cmd.extend(["-i", ",".join(f"gpu:{gid}" for gid in self._gpu_ids)])
# Dropping CUDA_VISIBLE_DEVICES suppresses dcgmi's multi-line stdout
# warning preamble; dcgmi targets the host engine, not CUDA, so the
# variable has no effect on which GPUs it reports.
env = {k: v for k, v in os.environ.items() if k != "CUDA_VISIBLE_DEVICES"}
try:
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
start_new_session=True,
env=env,
)
except FileNotFoundError as exc:
raise DcgmStreamError(
f"dcgmi not found: {exc}",
remediation="Install NVIDIA DCGM, or use the prometheus backend.",
) from exc
[docs]
def close(self) -> None:
"""Terminate the subprocess. Safe to call after a failed ``open``."""
self._closed = True
proc = self._proc
if proc is not None and proc.poll() is None:
try:
proc.terminate()
except ProcessLookupError:
pass
try:
proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
try:
proc.kill()
except ProcessLookupError:
pass
# ── streaming ────────────────────────────────────────────────────────
[docs]
def stream_ticks(self) -> Iterator[List[RawRecord]]:
"""Yield one list of ``RawRecord`` objects per sampling tick.
A tick ends when a ``GPU <id>`` row repeats an id already buffered for
the current tick. The first tick is dropped. Raises ``DcgmStreamError``
if the subprocess exits non-zero (unless ``close`` was called).
"""
proc = self._proc
if proc is None or proc.stdout is None:
return
current: Dict[str, str] = {} # gpu_id -> most-recent row (this tick)
order: List[str] = [] # gpu_id order within the tick
skipped_first = False
for raw_line in proc.stdout:
if self._closed:
break
stripped = raw_line.strip()
if not stripped or stripped.startswith("#") or stripped.startswith("ID"):
continue
parts = stripped.split()
if len(parts) < 2 or parts[0] != "GPU":
continue
gpu_id = parts[1]
if gpu_id in current:
# Boundary: this id already has a row -> flush the tick.
if skipped_first:
yield self._make_tick(current, order)
else:
skipped_first = True # drop the cold-start tick
current = {}
order = []
current[gpu_id] = stripped
order.append(gpu_id)
# stdout closed -> subprocess exited; surface a non-zero exit.
rc = proc.poll()
if rc is not None and rc != 0 and not self._closed:
raise DcgmStreamError(
f"dcgmi dmon exited with code {rc}: {self._read_stderr_tail()}",
remediation="Check the DCGM host engine and profiling permissions.",
)
[docs]
def stream(self) -> Iterator[RawRecord]:
for tick in self.stream_ticks():
yield from tick
def _make_tick(self, current: Dict[str, str], order: List[str]) -> List[RawRecord]:
block = "\n".join(current[gid] for gid in order)
return parse_dmon_block(
block,
source_version=self._source_version,
timestamp=time.monotonic(),
wallclock=time.time(),
)
def _read_stderr_tail(self) -> str:
proc = self._proc
if proc is None or proc.stderr is None:
return ""
try:
return (proc.stderr.read() or "").strip()
except Exception:
return ""
# ── introspection ──────────────────────────────────────────────────────
@property
def stderr(self):
"""The subprocess stderr stream (for a consumer that drains it)."""
return self._proc.stderr if self._proc is not None else None
@property
def caps(self) -> BackendCaps:
return BackendCaps(
kind=BackendKind.DCGMI,
fields=frozenset(DCGM_DMON_METRIC_NAMES),
)
# Fail fast at import time if the class drifts from the Backend contract.
assert isinstance(DcgmiBackend(), Backend)