From 0ef3e566fce7c6543ee75afdb569ef8dccac5003 Mon Sep 17 00:00:00 2001 From: Nicholas Devenish Date: Mon, 16 Jun 2025 13:06:50 +0100 Subject: [PATCH 1/3] dlq_purge: Strip dlq. prefix from rabbitmq queues --- src/zocalo/cli/dlq_purge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zocalo/cli/dlq_purge.py b/src/zocalo/cli/dlq_purge.py index 38457fa0..440f683b 100644 --- a/src/zocalo/cli/dlq_purge.py +++ b/src/zocalo/cli/dlq_purge.py @@ -60,7 +60,7 @@ def run() -> None: args = parser.parse_args(["--stomp-prfx=DLQ"] + sys.argv[1:]) if args.transport == "PikaTransport": - queues = ["dlq." + a for a in args.queues] + queues = ["dlq." + a.removeprefix("dlq.") for a in args.queues] else: queues = args.queues transport = workflows.transport.lookup(args.transport)() From 4f9ab32c64a1aee319bd72930192f3a7daceca97 Mon Sep 17 00:00:00 2001 From: Nicholas Devenish Date: Mon, 16 Jun 2025 14:24:56 +0100 Subject: [PATCH 2/3] dlq_purge: Show what queues messages came from --- src/zocalo/cli/dlq_purge.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/zocalo/cli/dlq_purge.py b/src/zocalo/cli/dlq_purge.py index 440f683b..5cb61525 100644 --- a/src/zocalo/cli/dlq_purge.py +++ b/src/zocalo/cli/dlq_purge.py @@ -15,6 +15,7 @@ import time from datetime import datetime from functools import partial +from typing import Literal import workflows @@ -66,9 +67,11 @@ def run() -> None: transport = workflows.transport.lookup(args.transport)() characterfilter = re.compile(r"[^a-zA-Z0-9._-]+", re.UNICODE) - idlequeue: queue.Queue = queue.Queue() + idlequeue: queue.Queue[Literal["start", "done"] | tuple[str, str]] = queue.Queue() - def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: + def receive_dlq_message( + header: dict, message: dict, *, queue_name: str, rabbitmq=False + ) -> None: idlequeue.put_nowait("start") if rabbitmq: msg_time = int(datetime.timestamp(header["x-death"][0]["time"])) * 1000 @@ -99,8 +102,11 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: with filename.open("w") as fh: json.dump(dlqmsg, fh, indent=2, sort_keys=True) - print( - f"Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}" + idlequeue.put_nowait( + ( + queue_name, + f" Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}", + ) ) transport.ack(header) idlequeue.put_nowait("done") @@ -112,17 +118,32 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None: elif args.transport == "PikaTransport": rmq = RabbitMQAPI.from_zocalo_configuration(zc) queues = [q.name for q in rmq.queues() if q.name.startswith("dlq.")] + print(f"Looking for DLQ messages in {len(queues)} queues...") for queue_ in queues: - print("Looking for DLQ messages in " + queue_) transport.subscribe( queue_, - partial(receive_dlq_message, rabbitmq=args.transport == "PikaTransport"), + partial( + receive_dlq_message, + rabbitmq=args.transport == "PikaTransport", + queue_name=queue_, + ), acknowledgement=True, ) + messages: dict[str, list[str]] = {} try: idlequeue.get(True, args.wait or 3) while True: - idlequeue.get(True, args.wait or 0.1) + result = idlequeue.get(True, args.wait or 0.1) + if isinstance(result, tuple): + queuename, message = result + messages.setdefault(queuename, []).append(message) + except queue.Empty: + # Print out what we found, per queue + for queuename, q_messages in messages.items(): + print(f"Found {len(q_messages)} DLQ messages in {queuename}") + for message in q_messages: + print(message) + print("Done.") transport.disconnect() From 0d7df147b5993105229b16a8d9700d453766da48 Mon Sep 17 00:00:00 2001 From: Nicholas Devenish Date: Tue, 17 Jun 2025 11:49:32 +0100 Subject: [PATCH 3/3] Update history --- HISTORY.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.rst b/HISTORY.rst index ee9abd44..91fc5d5a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,6 +6,7 @@ Unreleased ---------- - Minimum supported python is now 3.11. - Switch python packaging backend back to setuptools. (`#263 `_) +- ``zocalo.dlq_purge``: Show which queues DLQ messages came from, and accept queue names with prefix. (`#264 `_) 1.2.0 (2024-11-14) ------------------