diff --git a/HISTORY.rst b/HISTORY.rst index ee9abd4..91fc5d5 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,6 +6,7 @@ Unreleased ---------- - Minimum supported python is now 3.11. - Switch python packaging backend back to setuptools. (`#263 `_) +- ``zocalo.dlq_purge``: Show which queues DLQ messages came from, and accept queue names with prefix. (`#264 `_) 1.2.0 (2024-11-14) ------------------ diff --git a/src/zocalo/cli/dlq_purge.py b/src/zocalo/cli/dlq_purge.py index 38457fa..5cb6152 100644 --- a/src/zocalo/cli/dlq_purge.py +++ b/src/zocalo/cli/dlq_purge.py @@ -15,6 +15,7 @@ import time from datetime import datetime from functools import partial +from typing import Literal import workflows @@ -60,15 +61,17 @@ def run() -> None: args = parser.parse_args(["--stomp-prfx=DLQ"] + sys.argv[1:]) if args.transport == "PikaTransport": - queues = ["dlq." + a for a in args.queues] + queues = ["dlq." + a.removeprefix("dlq.") for a in args.queues] else: queues = args.queues transport = workflows.transport.lookup(args.transport)() characterfilter = re.compile(r"[^a-zA-Z0-9._-]+", re.UNICODE) - idlequeue: queue.Queue = queue.Queue() + idlequeue: queue.Queue[Literal["start", "done"] | tuple[str, str]] = queue.Queue() - def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: + def receive_dlq_message( + header: dict, message: dict, *, queue_name: str, rabbitmq=False + ) -> None: idlequeue.put_nowait("start") if rabbitmq: msg_time = int(datetime.timestamp(header["x-death"][0]["time"])) * 1000 @@ -99,8 +102,11 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: with filename.open("w") as fh: json.dump(dlqmsg, fh, indent=2, sort_keys=True) - print( - f"Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}" + idlequeue.put_nowait( + ( + queue_name, + f" Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}", + ) ) transport.ack(header) idlequeue.put_nowait("done") @@ -112,17 +118,32 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: elif args.transport == "PikaTransport": rmq = RabbitMQAPI.from_zocalo_configuration(zc) queues = [q.name for q in rmq.queues() if q.name.startswith("dlq.")] + print(f"Looking for DLQ messages in {len(queues)} queues...") for queue_ in queues: - print("Looking for DLQ messages in " + queue_) transport.subscribe( queue_, - partial(receive_dlq_message, rabbitmq=args.transport == "PikaTransport"), + partial( + receive_dlq_message, + rabbitmq=args.transport == "PikaTransport", + queue_name=queue_, + ), acknowledgement=True, ) + messages: dict[str, list[str]] = {} try: idlequeue.get(True, args.wait or 3) while True: - idlequeue.get(True, args.wait or 0.1) + result = idlequeue.get(True, args.wait or 0.1) + if isinstance(result, tuple): + queuename, message = result + messages.setdefault(queuename, []).append(message) + except queue.Empty: + # Print out what we found, per queue + for queuename, q_messages in messages.items(): + print(f"Found {len(q_messages)} DLQ messages in {queuename}") + for message in q_messages: + print(message) + print("Done.") transport.disconnect()