diff --git a/docs/index.rst b/docs/index.rst index a441359c..313516a6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,6 +9,7 @@ Welcome to Zocalo's documentation! workflows siteconfig installation + systemtest contributing authors history diff --git a/docs/systemtest.rst b/docs/systemtest.rst new file mode 100644 index 00000000..451cb05a --- /dev/null +++ b/docs/systemtest.rst @@ -0,0 +1,29 @@ +============ +System Tests +============ + +Test various parts of the zocalo system. Tests are found via the `zocalo.system_tests` +entry points. ActiveMQ and Dispatcher tests are included by default. + +.. code-block:: + + zocalo.run_system_tests (--debug) + +Minimum config required to test: + +* `ActiveMQ` +* `Dispatcher` + +.. code-block:: + + storage: + plugin: storage + + system_tests: + temp_dir: /tmp/zocalo-test + + dispatcher: + ispyb_dcid: 1 + expected_beamline: bl + +`recipes/test-dispatcher.json` needs to be copied to the local recipe path diff --git a/requirements_dev.txt b/requirements_dev.txt index 7232afcd..067d5932 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,3 +6,4 @@ pytest-mock pytest==6.2.5 setuptools==58.2.0 workflows==2.13 +junit-xml==1.9 diff --git a/setup.cfg b/setup.cfg index f99f9b03..b24a276f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,7 @@ install_requires = marshmallow setuptools workflows>=2.7 + junit_xml packages = find: package_dir = =src @@ -44,6 +45,7 @@ console_scripts = zocalo.service = zocalo.service:start_service zocalo.shutdown = zocalo.cli.shutdown:run zocalo.wrap = zocalo.cli.wrap:run + zocalo.run_system_tests = zocalo.cli.system_test:run libtbx.dispatcher.script = zocalo.go = zocalo.go zocalo.queue_drain = zocalo.queue_drain @@ -61,6 +63,9 @@ zocalo.configuration.plugins = jmx = zocalo.configuration.plugin_jmx:JMX zocalo.wrappers = dummy = zocalo.wrapper:DummyWrapper +zocalo.system_tests = + activemq = zocalo.system_test.tests.activemq:ActiveMQ + dispatcher = zocalo.system_test.tests.dispatcher:DispatcherService [options.packages.find] where = src diff --git a/src/zocalo/cli/system_test.py b/src/zocalo/cli/system_test.py new file mode 100644 index 00000000..cf06a222 --- /dev/null +++ b/src/zocalo/cli/system_test.py @@ -0,0 +1,341 @@ +import collections +import argparse +import logging +import sys +import time +import pkg_resources +import operator + +import junit_xml +from workflows.transport.stomp_transport import StompTransport + +import zocalo.configuration + +from zocalo.system_test.result import Result + +logger = logging.getLogger(__name__) + +stomp_logger = logging.getLogger("stomp.py") +stomp_logger.setLevel(logging.WARNING) + +TimerEvent = collections.namedtuple( + "TimerEvent", "time, callback, expected_result, result_object" +) + + +def run(): + if "--debug" in sys.argv: + level = logging.DEBUG + stomp_logger.setLevel(logging.DEBUG) + else: + level = logging.INFO + logging.basicConfig(level=level) + + parser = argparse.ArgumentParser(description="Zocalo system tests") + parser.add_argument( + "-c", dest="classes", help="Filter tests to specific classes", + ) + parser.add_argument( + "-k", dest="functions", help="Filter tests to specific functions", + ) + parser.add_argument( + "--debug", action="store_true", help="Enable debug output", + ) + + zc = zocalo.configuration.from_file() + envs = zc.activate() + zc.add_command_line_options(parser) + + try: + zc.storage["system_tests"] + except KeyError: + raise AttributeError("Zocalo configuration `storage` plugin does not contain a `system_tests` key") + + args = parser.parse_args() + + test_mode = False + if "test" in envs: + logger.info("Running on test configuration") + test_mode = True + + transport = StompTransport() + transport.connect() + if not transport.is_connected(): + logger.critical("Could not connect to ActiveMQ server") + sys.exit(1) + + # Load system tests + systest_classes = {} + for entry in pkg_resources.iter_entry_points("zocalo.system_tests"): + cls = entry.load() + systest_classes[cls.__name__] = cls + + systest_count = len(systest_classes) + logger.info("Found %d system test classes" % systest_count) + + if args.classes and systest_count: + systest_classes = { + n: cls + for n, cls in systest_classes.items() + if any(n.lower().startswith(v.lower()) for v in [args.classes]) + } + logger.info( + "Filtered %d classes via command line arguments" + % (systest_count - len(systest_classes)) + ) + systest_count = len(systest_classes) + + tests = {} + count = 0 + collection_errors = False + for classname, cls in systest_classes.items(): + logger.debug("Collecting tests from %s" % classname) + for testname, testsetting in cls(zc=zc, dev_mode=test_mode).collect_tests().items(): + count += 1 + if (args.functions and testname == args.functions) or not args.functions: + testresult = Result() + testresult.set_name(testname) + testresult.set_classname(classname) + testresult.early = 0 + if testsetting.errors: + testresult.log_trace("\n".join(testsetting.errors)) + logger.error( + "Error reading test %s:\n%s", + testname, + "\n".join(testsetting.errors), + ) + collection_errors = True + tests[(classname, testname)] = (testsetting, testresult) + logger.info("Found %d system tests" % count) + logger.info( + "Filtered %d system tests via command line arguments" % (count - len(tests)) + ) + if collection_errors: + sys.exit("Errors during test collection") + + # Set up subscriptions + start_time = time.time() # This is updated after sending all messages + + channels = collections.defaultdict(list) + for test, _ in tests.values(): + for expectation in test.expect: + channels[(expectation["queue"], expectation["topic"])].append(expectation) + for expectation in test.quiet: + channels[(expectation["queue"], expectation["topic"])].extend([]) + + channel_lookup = {} + + unexpected_messages = Result() + unexpected_messages.set_name("received_no_unexpected_messages") + unexpected_messages.set_classname(".") + unexpected_messages.count = 0 + + def handle_receipt(header, message): + expected_messages = channels[channel_lookup[header["subscription"]]] + for expected_message in expected_messages: + if not expected_message.get("received"): + if expected_message["message"] == message: + if expected_message.get("headers"): + headers_match = True + for parameter, value in expected_message["headers"].items(): + if value != header.get(parameter): + headers_match = False + if not headers_match: + logger.warning( + "Received a message similar to an expected message:\n" + + str(message) + + "\n but its header\n" + + str(header) + + "\ndoes not match the expected header:\n" + + str(expected_message["headers"]) + ) + continue + if ( + expected_message.get("min_wait") + and (time.time() - start_time) < expected_message["min_wait"] + ): + expected_message["early"] = ( + "Received expected message:\n" + + str(header) + + "\n" + + str(message) + + "\n%.1f seconds too early." + % (expected_message["min_wait"] + start_time - time.time()) + ) + logger.warning(expected_message["early"]) + expected_message["received"] = True + logger.debug( + "Received expected message:\n" + + str(header) + + "\n" + + str(message) + + "\n" + ) + return + logger.warning( + "Received unexpected message:\n" + + str(header) + + "\n" + + str(message) + + "\n which is not in \n" + + str(expected_messages) + + "\n" + ) + unexpected_messages.log_error( + message="Received unexpected message", + output=str(header) + "\n" + str(message) + "\n", + ) + unexpected_messages.count += 1 + + for n, (queue, topic) in enumerate(channels.keys()): + logger.debug("%2d: Subscribing to %s" % (n + 1, queue)) + if queue: + sub_id = transport.subscribe(queue, handle_receipt) + if topic: + sub_id = transport.subscribe_broadcast(topic, handle_receipt) + channel_lookup[str(sub_id)] = (queue, topic) + # subscriptions may be expensive on the server side, so apply some rate limiting + # so that the server can catch up and replies on this connection are not unduly + # delayed + time.sleep(0.3) + delay = 0.1 * len(channels) + 0.007 * len(channels) * len(channels) + logger.debug(f"Waiting {delay:.1f} seconds...") + time.sleep(delay) + + # Send out messages + for test, _ in tests.values(): + for message in test.send: + if message.get("queue"): + logger.debug("Sending message to %s", message["queue"]) + transport.send( + message["queue"], + message["message"], + headers=message["headers"], + persistent=False, + ) + if message.get("topic"): + logger.debug("Broadcasting message to %s", message["topic"]) + transport.broadcast( + message["topic"], message["message"], headers=message["headers"] + ) + + # Prepare timer events + start_time = time.time() + + timer_events = [] + for test, result in tests.values(): + for event in test.timers: + event["at_time"] = event["at_time"] + start_time + function = event.get("callback") + if function: + args = event.get("args", ()) + kwargs = event.get("kwargs", {}) + timer_events.append( + TimerEvent( + time=event["at_time"], + result_object=result, + callback=lambda function=function: function(*args, **kwargs), + expected_result=event.get("expect_return", Ellipsis), + ) + ) + else: + timer_events.append( + TimerEvent( + time=event["at_time"], + result_object=result, + callback=lambda: None, + expected_result=Ellipsis, + ) + ) + timer_events = sorted(timer_events, key=operator.attrgetter("time")) + + # Wait for messages and timeouts, run events + keep_waiting = True + last_message = time.time() + while keep_waiting: + + # Wait fixed time period or until next event + wait_to = time.time() + 0.2 + keep_waiting = False + while timer_events and time.time() > timer_events[0].time: + event = timer_events.pop(0) + event_result = event.callback() + if event.expected_result is not Ellipsis: + if event.expected_result != event_result: + logger.warning( + f"{event.result_object.classname} timer event failed for {event.result_object.name}: return value '{event_result}' does not match '{event.expected_result}'" + ) + event.result_object.log_error( + message="Timer event failed with result '%s' instead of expected '%s'" + % (event_result, event.expected_result) + ) + if timer_events: + wait_to = min(wait_to, timer_events[0][0]) + keep_waiting = True + if time.time() > last_message + 5: + logger.info("Waited %5.1fs." % (time.time() - start_time)) + last_message = time.time() + time.sleep(max(0.01, wait_to - time.time())) + + for testname, test in tests.items(): + for expectation in test[0].expect: + if not expectation.get("received") and not expectation.get( + "received_timeout" + ): + if time.time() > start_time + expectation["timeout"]: + expectation["received_timeout"] = True + logger.warning( + "Test %s.%s timed out waiting for message\n%s" + % (testname[0], testname[1], str(expectation)) + ) + test[1].log_error( + message="No answer received within time limit.", + output=str(expectation), + ) + else: + keep_waiting = True + + for testname, test in tests.items(): + for expectation in test[0].expect: + if expectation.get("early"): + test[1].log_error( + message="Answer received too early.", output=str(expectation) + ) + test[1].early += 1 + + # Export results + ts = junit_xml.TestSuite( + "zocalo.system_test", [r for _, r in tests.values()] + [unexpected_messages] + ) + with open("output.xml", "w") as f: + junit_xml.to_xml_report_file(f, [ts], prettyprint=True) + + successes = sum(r.is_success() for _, r in tests.values()) + logger.info( + "System test run completed, %d of %d tests succeeded." % (successes, len(tests)) + ) + for a, b in tests.values(): + if not b.is_success(): + if b.is_failure() and b.failure_output: + logger.error( + " %s %s failed:\n %s", + b.classname, + b.name, + b.failure_output.replace("\n", "\n "), + ) + else: + logger.warning( + " %s %s received %d out of %d expected replies %s" + % ( + b.classname, + b.name, + len([x for x in a.expect if x.get("received")]), + len(a.expect), + "(%d early)" % b.early if b.early else "", + ) + ) + if unexpected_messages.count: + logger.error( + " Received %d unexpected message%s." + % (unexpected_messages.count, "" if unexpected_messages.count == 1 else "s") + ) diff --git a/src/zocalo/system_test/__init__.py b/src/zocalo/system_test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/zocalo/system_test/common.py b/src/zocalo/system_test/common.py new file mode 100644 index 00000000..9dabb7f9 --- /dev/null +++ b/src/zocalo/system_test/common.py @@ -0,0 +1,316 @@ +import collections +import logging +import string +import uuid +import json +import os.path + +from unittest import mock +from workflows.recipe import Recipe + + +CollectedTest = collections.namedtuple( + "CollectedTest", "send, expect, timers, errors, quiet" +) + + +class SafeDict(dict): + """A dictionary that returns undefined keys as {keyname}. + This can be used to selectively replace variables in datastructures.""" + + def __missing__(self, key): + return "{" + key + "}" + + +class CommonSystemTest: + """ + Base class for system tests for Zocalo, + the Diamond Light Source data analysis framework. + """ + + uuid = "T-12345678-1234-1234-1234-1234567890ab" + """A random unique identifier for tests. A new one will be generated on class + initialization and for each invocation of a test function.""" + + parameters = SafeDict() + """Set of known test parameters. Generally only a unique test identifier, + parameters['uuid'], will be set.""" + + validation = False + """Set to true when test functions are only called for validation rather than + testing. Think of this as 'dummy_mode'.""" + + development_mode = False + """A flag to distinguish between testing the live system and testing the + development system. This should be used only sparingly, after all tests + should be as realistic as possible, but may be required in some places, + eg. to decide where to load external files from.""" + + log = logging.getLogger("dlstbx.system_test") + """Common logger object.""" + + def __init__(self, zc, dev_mode=False): + """Constructor via which the development mode can be set.""" + self.development_mode = dev_mode + self.rotate_uuid() + self._zc = zc + + def get_recipe(self, recipe, load=True): + """Load a recipe from file + + Kwargs: + load(bool): Whether to load the json into a Recipe + """ + recipe_path = self._zc.storage["zocalo.recipe_directory"] + with open(os.path.join(recipe_path, f"{recipe}.json")) as fh: + recipe = json.load(fh) + if load: + return Recipe(recipe) + else: + return recipe + + def rotate_uuid(self): + """Generate a new unique ID for the test. Prepend 'T-' to a UUID to + distinguish between IDs used in system tests and IDs used for live + processing. This helps for example when interpreting logs, as system test + messages will show up in isolation rather than as part of a processing + pipeline.""" + self.uuid = "T-" + str(uuid.uuid4()) + + def enumerate_test_functions(self): + """Returns a list of (name, function) tuples for all declared test + functions in the class.""" + return [ + (function, getattr(self, function)) + for function in dir(self) + if function.startswith("test_") + ] + + def validate(self): + """Checks that all test functions parse correctly to pick up syntax errors. + Does run test functions with disabled messaging functions.""" + # Replace messaging functions by mock constructs + patch_functions = ["_add_timer", "_messaging"] + original_functions = {(x, getattr(self, x)) for x in patch_functions} + for x in patch_functions: + setattr(self, x, mock.create_autospec(getattr(self, x))) + self.validation = True + try: + for name, function in self.enumerate_test_functions(): + self.log.info("validating %s" % name) + function() + self.rotate_uuid() # rotate uuid for next function + self.log.info("OK") + finally: + # Restore messaging functions + for name, function in original_functions: + setattr(self, name, function) + self.validation = False + + def collect_tests(self): + """Runs all test functions and collects messaging information. + Returns a dictionary of + { testname: CollectedTest } + with the namedtuple CollectedTest parameters initialised with arrays. + """ + self.config = self._zc.storage["system_tests"] + messages = {} + for name, function in self.enumerate_test_functions(): + self.rotate_uuid() + self.parameters["uuid"] = self.uuid + + def messaging(direction, **kwargs): + if direction not in messages[name]._fields: + raise RuntimeError("Invalid messaging call (%s)" % str(direction)) + getattr(messages[name], direction).append(kwargs) + + def timer(**kwargs): + messages[name].timers.append(kwargs) + + self._messaging = messaging + self._add_timer = timer + messages[name] = CollectedTest( + send=[], expect=[], timers=[], errors=[], quiet=[] + ) + try: + function() + except Exception: + import traceback + + messages[name].errors.append(traceback.format_exc()) + return messages + + # + # -- Functions for use within tests ---------------------------------------- + # + + def send_message(self, queue=None, topic=None, headers={}, message=""): + """Use this function within tests to send messages to queues and topics.""" + assert queue or topic, "Message queue or topic destination required" + + # Inject the custom uuid into the message if its a recipe + if isinstance(message, dict): + if not message.get("parameters"): + message["parameters"] = {} + message["parameters"]["uuid"] = self.uuid + + self._messaging( + "send", queue=queue, topic=topic, headers=headers, message=message + ) + + def expect_message( + self, queue=None, topic=None, headers=None, message=None, min_wait=0, timeout=10 + ): + """Use this function within tests to wait for messages to queues and topics.""" + assert queue or topic, "Message queue or topic destination required" + assert ( + not queue or not topic + ), "Can only expect message on queue or topic, not both" + self._messaging( + "expect", + queue=queue, + topic=topic, + headers=headers, + message=message, + min_wait=min_wait, + timeout=timeout, + ) + + def expect_recipe_message( + self, + recipe, + recipe_path, + recipe_pointer, + headers=None, + payload=None, + min_wait=0, + timeout=10, + queue=None, + topic=None, + environment=None, + ): + """Use this function within tests to wait for recipe-wrapped messages.""" + assert recipe, "Recipe required" + if not (queue or topic): + assert recipe_pointer > 0, "Recipe-pointer required" + assert recipe_pointer in recipe, "Given recipe-pointer %s invalid" % str( + recipe_pointer + ) + queue = recipe[recipe_pointer].get("queue") + topic = recipe[recipe_pointer].get("topic") + assert queue or topic, "Message queue or topic destination required" + assert ( + not queue or not topic + ), "Can only expect message on queue or topic, not both" + if headers is None: + headers = {"workflows-recipe": "True"} + else: + headers = headers.copy() + headers["workflows-recipe"] = "True" + if environment: + + class dictionary_contains: + def __init__(self, d): + self.containsdict = d + + def __eq__(self, other): + return self.containsdict.items() <= other.items() + + environment = dictionary_contains(environment) + else: + environment = mock.ANY + expected_message = { + "payload": payload, + "recipe": recipe, + "recipe-path": recipe_path, + "recipe-pointer": recipe_pointer, + "environment": environment, + } + self._messaging( + "expect", + queue=queue, + topic=topic, + headers=headers, + message=expected_message, + min_wait=min_wait, + timeout=timeout, + ) + + def expect_unreached_recipe_step( + self, recipe, recipe_pointer, min_wait=3, queue=None, topic=None, + ): + """Use this function within tests to mark recipe steps as unreachable.""" + assert recipe, "Recipe required" + if not (queue or topic): + assert recipe_pointer > 0, "Recipe-pointer required" + assert recipe_pointer in recipe, "Given recipe-pointer %s invalid" % str( + recipe_pointer + ) + queue = recipe[recipe_pointer].get("queue") + topic = recipe[recipe_pointer].get("topic") + assert queue or topic, "Message queue or topic destination required" + assert ( + not queue or not topic + ), "Can only expect message on queue or topic, not both" + + self._messaging( + "quiet", queue=queue, topic=topic, + ) + self._add_timer(at_time=min_wait) + + def timer_event( + self, at_time=None, callback=None, args=None, kwargs=None, expect_return=... + ): + if args is None: + args = [] + if kwargs is None: + kwargs = {} + assert at_time, "need to specify time for event" + assert callback, "need to specify callback function" + self._add_timer( + at_time=at_time, + callback=callback, + args=args, + kwargs=kwargs, + expect_return=expect_return, + ) + + def apply_parameters(self, item): + """Recursively apply formatting to {item}s in a data structure, leaving + undefined {item}s as they are. + + Examples: + parameters = { 'x':'5' } + recursively_replace_parameters( { '{x}': '{y}' } ) + => { '5': '{y}' } + + parameters = { 'y':'5' } + recursively_replace_parameters( { '{x}': '{y}' } ) + => { '{x}': '5' } + + parameters = { 'x':'3', 'y':'5' } + recursively_replace_parameters( { '{x}': '{y}' } ) + => { '3': '5' } + """ + if isinstance(item, str): + return string.Formatter().vformat(item, (), self.parameters) + if isinstance(item, dict): + return { + self.apply_parameters(key): self.apply_parameters(value) + for key, value in item.items() + } + if isinstance(item, tuple): + return tuple(self.apply_parameters(list(item))) + if isinstance(item, list): + return [self.apply_parameters(x) for x in item] + return item + + # + # -- Internal house-keeping functions -------------------------------------- + # + + def _add_timer(self, *args, **kwargs): + raise NotImplementedError("Test functions can not be run directly") + + def _messaging(self, *args, **kwargs): + raise NotImplementedError("Test functions can not be run directly") diff --git a/src/zocalo/system_test/recipes/test-dispatcher.json b/src/zocalo/system_test/recipes/test-dispatcher.json new file mode 100644 index 00000000..11b56c65 --- /dev/null +++ b/src/zocalo/system_test/recipes/test-dispatcher.json @@ -0,0 +1,11 @@ +{ + "1": { + "service": "System test", + "queue": "transient.system_test", + "parameters": { + "guid": "{guid}", + "beamline": "{ispyb_beamline}" + } + }, + "start": [[1, []]] + } diff --git a/src/zocalo/system_test/result.py b/src/zocalo/system_test/result.py new file mode 100644 index 00000000..e9f99dcd --- /dev/null +++ b/src/zocalo/system_test/result.py @@ -0,0 +1,104 @@ +import timeit + +from junit_xml import TestCase + + +class Result(TestCase): + def __init__(self): + TestCase.__init__(self, None) + + self.name = None + self.classname = None + self.elapsed_sec = 0 + self.skipped_message = None # first skipped message + self.skipped_output = None # skipped messages + self.failure_message = None # error message + self.failure_output = None # stack trace + # to test for failure use is_failure() + self.stdout = None # standard output + self.stderr = None # standard error + self.start_time = timeit.default_timer() + + def __repr__(self): + if self.is_failure(): + status = "failure" + elif self.is_error(): + status = "error" + elif self.is_skipped(): + status = "skipped" + else: + status = "success" + return f"" + + def update_timer(self): + self.set_time(timeit.default_timer() - self.start_time) + + def append(self, result): + self.update_timer() + + if self.failure_message is None: + self.failure_message = result.failure_message + # otherwise ignore new message + + if self.skipped_message is None: + self.skipped_message = result.skipped_message + # otherwise ignore new message + + if self.skipped_output is None: + self.skipped_output = result.skipped_output + elif result.skipped_output is not None: + self.skipped_output = "\n".join( + [self.skipped_output, result.skipped_output] + ) + + if self.stderr is None: + self.stderr = result.stderr + elif result.stderr is not None: + self.stderr = "\n".join([self.stderr, result.stderr]) + + if self.stdout is None: + self.stdout = result.stdout + elif result.stdout is not None: + self.stdout = "\n".join([self.stdout, result.stdout]) + + if self.failure_output is None: + self.failure_output = result.failure_output + elif result.failure_output is not None: + self.failure_output = "\n".join( + [self.failure_output, result.failure_output] + ) + + def log_message(self, text): + self.update_timer() + if self.stdout is None: + self.stdout = text + else: + self.stdout = self.stdout + "\n" + text + + def log_skip(self, text): + self.update_timer() + self.add_skipped_info(message=text, output=text) + + def log_error(self, message, output=None): + self.update_timer() + self.add_error_info(message=message, output=output) + if self.stderr is None: + self.stderr = message + else: + self.stderr = self.stderr + "\n" + message + + def log_trace(self, text): + self.update_timer() + self.add_failure_info(message=text.split("\n")[0], output=text) + + def set_name(self, name): + self.name = name + + def set_classname(self, classname): + self.classname = classname + + def set_time(self, time): + self.elapsed_sec = time + + def is_success(self): + return not self.is_failure() and not self.is_error() and not self.is_skipped() diff --git a/src/zocalo/system_test/tests/__init__.py b/src/zocalo/system_test/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/zocalo/system_test/tests/activemq.py b/src/zocalo/system_test/tests/activemq.py new file mode 100644 index 00000000..c7fe257b --- /dev/null +++ b/src/zocalo/system_test/tests/activemq.py @@ -0,0 +1,18 @@ +from zocalo.system_test.common import CommonSystemTest + + +class ActiveMQ(CommonSystemTest): + """Connect to messaging server and send a message to myself.""" + + def test_loopback_message(self): + self.send_message( + queue="transient.system_test", message="loopback " + self.uuid + ) + + self.expect_message( + queue="transient.system_test", message="loopback " + self.uuid, timeout=10, + ) + + +if __name__ == "__main__": + ActiveMQ().validate() diff --git a/src/zocalo/system_test/tests/dispatcher.py b/src/zocalo/system_test/tests/dispatcher.py new file mode 100644 index 00000000..caccc9e5 --- /dev/null +++ b/src/zocalo/system_test/tests/dispatcher.py @@ -0,0 +1,109 @@ +import copy + +from unittest import mock +from workflows.recipe import Recipe + +from zocalo.system_test.common import CommonSystemTest + + +class DispatcherService(CommonSystemTest): + """Tests for the dispatcher service (recipe service).""" + + def test_processing_a_trivial_recipe(self): + """Passing in a recipe to the service without external dependencies. + The recipe should be interpreted and a simple message passed back to a + fixed destination.""" + + recipe = self.get_recipe("test-dispatcher", load=False) + self.send_message( + queue="processing_recipe", message={"recipes": ["test-dispatcher"]} + ) + + self.expect_recipe_message( + recipe=Recipe(recipe), + recipe_path=[], + recipe_pointer=1, + payload=recipe["start"][0][1], + ) + + def test_parsing_a_recipe_and_replacing_parameters(self): + """Passing in a recipe to the service without external dependencies. + The recipe should be interpreted, the 'uuid' placeholder replaced using + the parameter field, and the message passed back. + The message should then contain the recipe and a correctly set pointer.""" + + parameters = {"guid": self.uuid} + + recipe = self.get_recipe("test-dispatcher", load=False) + self.send_message( + queue="processing_recipe", + message={"recipes": ["test-dispatcher"], "parameters": parameters}, + ) + + expected_recipe = Recipe(recipe) + expected_recipe.apply_parameters(parameters) + + self.expect_recipe_message( + recipe=expected_recipe, + recipe_path=[], + recipe_pointer=1, + payload=recipe["start"][0][1], + ) + + def test_ispyb_magic(self): + """Test the ISPyB magic to see that it does what we think it should do""" + + recipe = self.get_recipe("test-dispatcher", load=False) + self.send_message( + queue="processing_recipe", + message={ + "recipes": ["test-dispatcher"], + "parameters": {"ispyb_dcid": self.config["dispatcher"]["ispyb_dcid"]}, + }, + ) + + parameters = {"ispyb_beamline": self.config["dispatcher"]["expected_beamline"]} + expected_recipe = Recipe(recipe) + expected_recipe.apply_parameters(parameters) + + self.expect_recipe_message( + recipe=expected_recipe, + recipe_path=[], + recipe_pointer=1, + payload=recipe["start"][0][1], + ) + + def test_wait_for_ispyb_runstatus(self): + """ + Test the logic to wait for a RunStatus to be set in ISPyB. + Since we don't touch the database this should run into a timeout condition. + """ + message = { + "recipes": ["test-dispatcher"], + "parameters": { + "ispyb_dcid": 4977408, + "ispyb_wait_for_runstatus": True, + "dispatcher_timeout": 10, + "dispatcher_error_queue": "transient.system_test.timeout", + }, + } + self.send_message(queue="processing_recipe", message=message) + + expected_recipe = self.get_recipe("test-dispatcher") + self.expect_unreached_recipe_step(recipe=expected_recipe, recipe_pointer=1) + + # Emulate recipe mangling + message = copy.deepcopy(message) + message["parameters"]["uuid"] = self.uuid + message["parameters"]["dispatcher_expiration"] = mock.ANY + + self.expect_message( + queue="transient.system_test.timeout", + message=message, + min_wait=9, + timeout=30, + ) + + +if __name__ == "__main__": + DispatcherService().validate() diff --git a/src/zocalo/system_test/tmp.py b/src/zocalo/system_test/tmp.py new file mode 100644 index 00000000..8ead4bcf --- /dev/null +++ b/src/zocalo/system_test/tmp.py @@ -0,0 +1,41 @@ +import datetime +import errno +import os +import stat + + +def _create_tmp_folder(tmp_folder): + try: + os.makedirs(tmp_folder) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + try: + os.chmod( + tmp_folder, + stat.S_IRUSR + | stat.S_IWUSR + | stat.S_IXUSR + | stat.S_IRGRP + | stat.S_IWGRP + | stat.S_IXGRP + | stat.S_IROTH + | stat.S_IWOTH + | stat.S_IXOTH, + ) + except OSError as exception: + if exception.errno != errno.EPERM: + raise + + +def tmp_folder(path): + _create_tmp_folder(path) + return path + + +def tmp_folder_date(path): + _tmp_folder = os.path.join( + tmp_folder(path), datetime.date.today().strftime("%Y-%m-%d") + ) + _create_tmp_folder(path) + return _tmp_folder