nixpkgs/nixos/lib/test-driver/test_driver/qmp.py
Jörg Thalheim ef9502a009 nixos/test-driver: fix resource cleanup of vlan/qmp objects
Using __del__ is somewhat unsound resource cleanup in our clase the
logger already closed its logfile and therefor fails with exception
before the rest of the resources can be cleaned up.
2024-10-16 19:46:38 +03:00

99 lines
3.5 KiB
Python

import json
import logging
import os
import socket
from collections.abc import Iterator
from pathlib import Path
from queue import Queue
from typing import Any
logger = logging.getLogger(__name__)
class QMPAPIError(RuntimeError):
def __init__(self, message: dict[str, Any]):
assert "error" in message, "Not an error message!"
try:
self.class_name = message["class"]
self.description = message["desc"]
# NOTE: Some errors can occur before the Server is able to read the
# id member; in these cases the id member will not be part of the
# error response, even if provided by the client.
self.transaction_id = message.get("id")
except KeyError:
raise RuntimeError("Malformed QMP API error response")
def __str__(self) -> str:
return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>"
class QMPSession:
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
self.results: Queue[dict[str, str]] = Queue()
self.pending_events: Queue[dict[str, Any]] = Queue()
self.reader = sock.makefile("r")
self.writer = sock.makefile("w")
# Make the reader non-blocking so we can kind of select on it.
os.set_blocking(self.reader.fileno(), False)
hello = self._wait_for_new_result()
logger.debug(f"Got greeting from QMP API: {hello}")
# The greeting message format is:
# { "QMP": { "version": json-object, "capabilities": json-array } }
assert "QMP" in hello, f"Unexpected result: {hello}"
self.send("qmp_capabilities")
@classmethod
def from_path(cls, path: Path) -> "QMPSession":
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(path))
return cls(sock)
def close(self) -> None:
self.sock.close()
def _wait_for_new_result(self) -> dict[str, str]:
assert self.results.empty(), "Results set is not empty, missed results!"
while self.results.empty():
self.read_pending_messages()
return self.results.get()
def read_pending_messages(self) -> None:
line = self.reader.readline()
if not line:
return
evt_or_result = json.loads(line)
logger.debug(f"Received a message: {evt_or_result}")
# It's a result
if "return" in evt_or_result or "QMP" in evt_or_result:
self.results.put(evt_or_result)
# It's an event
elif "event" in evt_or_result:
self.pending_events.put(evt_or_result)
else:
raise QMPAPIError(evt_or_result)
def wait_for_event(self, timeout: int = 10) -> dict[str, Any]:
while self.pending_events.empty():
self.read_pending_messages()
return self.pending_events.get(timeout=timeout)
def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]:
while not self.pending_events.empty():
yield self.pending_events.get(timeout=timeout)
def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]:
self.read_pending_messages()
assert self.results.empty(), "Results set is not empty, missed results!"
data: dict[str, Any] = dict(execute=cmd)
if args != {}:
data["arguments"] = args
logger.debug(f"Sending {data} to QMP...")
json.dump(data, self.writer)
self.writer.write("\n")
self.writer.flush()
return self._wait_for_new_result()