Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Unreleased
----------
- Minimum supported python is now 3.11.
- Switch python packaging backend back to setuptools. (`#263 <https://github.com/DiamondLightSource/python-zocalo/pull/263>`_)
- ``zocalo.dlq_purge``: Show which queues DLQ messages came from, and accept queue names with prefix. (`#264 <https://github.com/DiamondLightSource/python-zocalo/pull/264>`_)

1.2.0 (2024-11-14)
------------------
Expand Down
37 changes: 29 additions & 8 deletions src/zocalo/cli/dlq_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
from datetime import datetime
from functools import partial
from typing import Literal

import workflows

Expand Down Expand Up @@ -60,15 +61,17 @@ def run() -> None:

args = parser.parse_args(["--stomp-prfx=DLQ"] + sys.argv[1:])
if args.transport == "PikaTransport":
queues = ["dlq." + a for a in args.queues]
queues = ["dlq." + a.removeprefix("dlq.") for a in args.queues]
else:
queues = args.queues
transport = workflows.transport.lookup(args.transport)()

characterfilter = re.compile(r"[^a-zA-Z0-9._-]+", re.UNICODE)
idlequeue: queue.Queue = queue.Queue()
idlequeue: queue.Queue[Literal["start", "done"] | tuple[str, str]] = queue.Queue()

def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
def receive_dlq_message(
header: dict, message: dict, *, queue_name: str, rabbitmq=False
) -> None:
idlequeue.put_nowait("start")
if rabbitmq:
msg_time = int(datetime.timestamp(header["x-death"][0]["time"])) * 1000
Expand Down Expand Up @@ -99,8 +102,11 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:

with filename.open("w") as fh:
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
print(
f"Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}"
idlequeue.put_nowait(
(
queue_name,
f" Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}",
)
)
transport.ack(header)
idlequeue.put_nowait("done")
Expand All @@ -112,17 +118,32 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
elif args.transport == "PikaTransport":
rmq = RabbitMQAPI.from_zocalo_configuration(zc)
queues = [q.name for q in rmq.queues() if q.name.startswith("dlq.")]
print(f"Looking for DLQ messages in {len(queues)} queues...")
for queue_ in queues:
print("Looking for DLQ messages in " + queue_)
transport.subscribe(
queue_,
partial(receive_dlq_message, rabbitmq=args.transport == "PikaTransport"),
partial(
receive_dlq_message,
rabbitmq=args.transport == "PikaTransport",
queue_name=queue_,
),
acknowledgement=True,
)
messages: dict[str, list[str]] = {}
try:
idlequeue.get(True, args.wait or 3)
while True:
idlequeue.get(True, args.wait or 0.1)
result = idlequeue.get(True, args.wait or 0.1)
if isinstance(result, tuple):
queuename, message = result
messages.setdefault(queuename, []).append(message)

except queue.Empty:
# Print out what we found, per queue
for queuename, q_messages in messages.items():
print(f"Found {len(q_messages)} DLQ messages in {queuename}")
for message in q_messages:
print(message)

print("Done.")
transport.disconnect()