From ffa751ee1bc5353fa1bb849a0e7d65fbe73424c3 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 2 Jan 2026 14:56:23 +0000 Subject: [PATCH 01/18] return refactoring --- .../fhir_immunization_pre_validators.py | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index 2903f4433..968b06694 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -225,26 +225,25 @@ def pre_validate_practitioner_reference(self, values: dict) -> None: raise ValueError( "performer must not contain internal references when there is no contained Practitioner resource" ) - return None + else: + practitioner_id = str(practitioner[0]["id"]) - practitioner_id = str(practitioner[0]["id"]) - - # Ensure that there are no internal references other than to the contained practitioner - if sum(1 for x in performer_internal_references if x != "#" + practitioner_id) != 0: - raise ValueError( - "performer must not contain any internal references other than" - + " to the contained Practitioner resource" - ) + # Ensure that there are no internal references other than to the contained practitioner + if sum(1 for x in performer_internal_references if x != "#" + practitioner_id) != 0: + raise ValueError( + "performer must not contain any internal references other than" + + " to the contained Practitioner resource" + ) - # Separate out the references to the contained practitioner and ensure that there is exactly one such reference - practitioner_references = [x for x in performer_internal_references if x == "#" + practitioner_id] + # Separate out the references to the contained practitioner and ensure that there is exactly one such reference + practitioner_references = [x for x in performer_internal_references if x == "#" + practitioner_id] - if len(practitioner_references) == 0: - raise ValueError(f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer") - elif len(practitioner_references) > 1: - raise ValueError( - f"contained Practitioner resource id '{practitioner_id}' must only be referenced once from performer" - ) + if len(practitioner_references) == 0: + raise ValueError(f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer") + elif len(practitioner_references) > 1: + raise ValueError( + f"contained Practitioner resource id '{practitioner_id}' must only be referenced once from performer" + ) def pre_validate_patient_identifier_extension(self, values: dict) -> None: """ @@ -882,8 +881,6 @@ def pre_validate_dose_quantity_system_and_code(self, values: dict) -> None: PreValidation.require_system_when_code_present(code, system, "doseQuantity.code", "doseQuantity.system") - return values - def pre_validate_dose_quantity_unit(self, values: dict) -> None: """ Pre-validate that, if doseQuantity.unit (legacy CSV field name: DOSE_UNIT_TERM) exists, From d89f3c91ec73eba7b9a943ebdea14dba9aad4ac4 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 2 Jan 2026 15:02:12 +0000 Subject: [PATCH 02/18] ruff --- .../src/common/models/fhir_immunization_pre_validators.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index 968b06694..dc9ce8d60 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -239,7 +239,9 @@ def pre_validate_practitioner_reference(self, values: dict) -> None: practitioner_references = [x for x in performer_internal_references if x == "#" + practitioner_id] if len(practitioner_references) == 0: - raise ValueError(f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer") + raise ValueError( + f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer" + ) elif len(practitioner_references) > 1: raise ValueError( f"contained Practitioner resource id '{practitioner_id}' must only be referenced once from performer" From b543a3d64ffbbb451302725534ecb178101e6efd Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 2 Jan 2026 15:13:31 +0000 Subject: [PATCH 03/18] empty From 94fcc9994d72d8a81db95e4b15c3777015032076 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 2 Jan 2026 16:12:09 +0000 Subject: [PATCH 04/18] test for commit hooks --- .../src/common/models/fhir_immunization_pre_validators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index dc9ce8d60..b52d3d988 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -18,7 +18,7 @@ class PreValidators: """ Validators which run prior to the FHIR validators and check that, where values exist, they - meet the NHS custom requirements. Note that validation of the existence of a value (i.e. it + meet the NHS custom requirements. Note: that validation of the existence of a value (i.e. it exists if mandatory, or doesn't exist if is not applicable) is done by the post validator except for a few key elements, the existence of which is explicitly checked as part of pre-validation. """ From f545006a029f38ce6370f9f2765e832afca90e63 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 2 Jan 2026 16:12:26 +0000 Subject: [PATCH 05/18] test for commit hooks reverted --- .../src/common/models/fhir_immunization_pre_validators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index b52d3d988..dc9ce8d60 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -18,7 +18,7 @@ class PreValidators: """ Validators which run prior to the FHIR validators and check that, where values exist, they - meet the NHS custom requirements. Note: that validation of the existence of a value (i.e. it + meet the NHS custom requirements. Note that validation of the existence of a value (i.e. it exists if mandatory, or doesn't exist if is not applicable) is done by the post validator except for a few key elements, the existence of which is explicitly checked as part of pre-validation. """ From 7f96ea8c7d03185fc94dc80578fc44f5294bdc0d Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 11:30:05 +0000 Subject: [PATCH 06/18] removed if/else; sum -> any --- .../fhir_immunization_pre_validators.py | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index dc9ce8d60..b8389a6c7 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -225,27 +225,26 @@ def pre_validate_practitioner_reference(self, values: dict) -> None: raise ValueError( "performer must not contain internal references when there is no contained Practitioner resource" ) - else: - practitioner_id = str(practitioner[0]["id"]) + return None - # Ensure that there are no internal references other than to the contained practitioner - if sum(1 for x in performer_internal_references if x != "#" + practitioner_id) != 0: - raise ValueError( - "performer must not contain any internal references other than" - + " to the contained Practitioner resource" - ) + practitioner_id = str(practitioner[0]["id"]) - # Separate out the references to the contained practitioner and ensure that there is exactly one such reference - practitioner_references = [x for x in performer_internal_references if x == "#" + practitioner_id] + # Ensure that there are no internal references other than to the contained practitioner + if any(1 for x in performer_internal_references if x != "#" + practitioner_id): + raise ValueError( + "performer must not contain any internal references other than" + + " to the contained Practitioner resource" + ) - if len(practitioner_references) == 0: - raise ValueError( - f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer" - ) - elif len(practitioner_references) > 1: - raise ValueError( - f"contained Practitioner resource id '{practitioner_id}' must only be referenced once from performer" - ) + # Separate out the references to the contained practitioner and ensure that there is exactly one such reference + practitioner_references = [x for x in performer_internal_references if x == "#" + practitioner_id] + + if len(practitioner_references) == 0: + raise ValueError(f"contained Practitioner resource id '{practitioner_id}' must be referenced from performer") + elif len(practitioner_references) > 1: + raise ValueError( + f"contained Practitioner resource id '{practitioner_id}' must only be referenced once from performer" + ) def pre_validate_patient_identifier_extension(self, values: dict) -> None: """ From 9b8ccd672c8ec754ec7c10a8dd21cd22b08242d0 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 12:01:35 +0000 Subject: [PATCH 07/18] removed if/else; sum -> any II --- .../src/common/models/fhir_immunization_pre_validators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py index b8389a6c7..3da10e0b7 100644 --- a/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py +++ b/lambdas/shared/src/common/models/fhir_immunization_pre_validators.py @@ -230,7 +230,7 @@ def pre_validate_practitioner_reference(self, values: dict) -> None: practitioner_id = str(practitioner[0]["id"]) # Ensure that there are no internal references other than to the contained practitioner - if any(1 for x in performer_internal_references if x != "#" + practitioner_id): + if any(x != "#" + practitioner_id for x in performer_internal_references): raise ValueError( "performer must not contain any internal references other than" + " to the contained Practitioner resource" From 18a7bac38ef647c2e92830fb3fe6645fff6e462c Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 15:55:22 +0000 Subject: [PATCH 08/18] try/catch for lambda handler --- .../src/forwarding_batch_lambda.py | 157 +++++++++--------- .../tests/test_forwarding_batch_lambda.py | 66 ++++++++ 2 files changed, 146 insertions(+), 77 deletions(-) diff --git a/lambdas/recordforwarder/src/forwarding_batch_lambda.py b/lambdas/recordforwarder/src/forwarding_batch_lambda.py index 2cc7253ca..d37be6a04 100644 --- a/lambdas/recordforwarder/src/forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/src/forwarding_batch_lambda.py @@ -69,84 +69,87 @@ def forward_request_to_dynamo( def forward_lambda_handler(event, _): - """Forward each row to the Imms API""" - logger.info("Processing started") - table = create_table() - filename_to_events_mapper = BatchFilenameToEventsMapper() - array_of_identifiers = [] - controller = make_batch_controller() - - for record in event["Records"]: - operation_start_time = str(datetime.now()) - kinesis_payload = record["kinesis"]["data"] - decoded_payload = base64.b64decode(kinesis_payload).decode("utf-8") - incoming_message_body = json.loads(decoded_payload, use_decimal=True) - file_key = incoming_message_body.get("file_key") - local_id = incoming_message_body.get("local_id") - - if is_eof_message(incoming_message_body): - logger.info("Received EOF message for file key: %s", file_key) - filename_to_events_mapper.add_event(incoming_message_body) - continue - - base_outgoing_message_body = { - "file_key": incoming_message_body.get("file_key"), - "row_id": incoming_message_body.get("row_id"), - "created_at_formatted_string": incoming_message_body.get("created_at_formatted_string"), - "local_id": incoming_message_body.get("local_id"), - "operation_requested": incoming_message_body.get("operation_requested"), - "supplier": incoming_message_body.get("supplier"), - "vaccine_type": incoming_message_body.get("vax_type"), - } - logger.info("Received message for file %s with local id: %s", file_key, local_id) - - try: - if incoming_diagnostics := incoming_message_body.get("diagnostics"): - raise RecordProcessorError(incoming_diagnostics) - - if not (fhir_json := incoming_message_body.get("fhir_json")): - raise MessageNotSuccessfulError("Server error - FHIR JSON not correctly sent to forwarder") - - # Check if the identifier is already present in the array - identifier_already_present = False - identifier_system = fhir_json["identifier"][0]["system"] - identifier_value = fhir_json["identifier"][0]["value"] - identifier = f"{identifier_system}#{identifier_value}" - - if identifier in array_of_identifiers: - identifier_already_present = True - delay_milliseconds = 30 - # A basic workaround by the existing team to ensure that subsequent operations e.g. an update after an - # initial create for the same item complete successfully. Consider using strongly consistent reads - # instead: VED-958 - time.sleep(delay_milliseconds / 1000) - else: - array_of_identifiers.append(identifier) - - imms_pk = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller) - logger.info("Successfully processed message. Local id: %s, PK: %s", local_id, imms_pk) - - except Exception as error: # pylint: disable = broad-exception-caught - filename_to_events_mapper.add_event( - { - **base_outgoing_message_body, - "operation_start_time": operation_start_time, - "operation_end_time": str(datetime.now()), - "diagnostics": create_diagnostics_dictionary(error), - } + try: + """Forward each row to the Imms API""" + logger.info("Processing started") + table = create_table() + filename_to_events_mapper = BatchFilenameToEventsMapper() + array_of_identifiers = [] + controller = make_batch_controller() + + for record in event["Records"]: + operation_start_time = str(datetime.now()) + kinesis_payload = record["kinesis"]["data"] + decoded_payload = base64.b64decode(kinesis_payload).decode("utf-8") + incoming_message_body = json.loads(decoded_payload, use_decimal=True) + file_key = incoming_message_body.get("file_key") + local_id = incoming_message_body.get("local_id") + + if is_eof_message(incoming_message_body): + logger.info("Received EOF message for file key: %s", file_key) + filename_to_events_mapper.add_event(incoming_message_body) + continue + + base_outgoing_message_body = { + "file_key": incoming_message_body.get("file_key"), + "row_id": incoming_message_body.get("row_id"), + "created_at_formatted_string": incoming_message_body.get("created_at_formatted_string"), + "local_id": incoming_message_body.get("local_id"), + "operation_requested": incoming_message_body.get("operation_requested"), + "supplier": incoming_message_body.get("supplier"), + "vaccine_type": incoming_message_body.get("vax_type"), + } + logger.info("Received message for file %s with local id: %s", file_key, local_id) + + try: + if incoming_diagnostics := incoming_message_body.get("diagnostics"): + raise RecordProcessorError(incoming_diagnostics) + + if not (fhir_json := incoming_message_body.get("fhir_json")): + raise MessageNotSuccessfulError("Server error - FHIR JSON not correctly sent to forwarder") + + # Check if the identifier is already present in the array + identifier_already_present = False + identifier_system = fhir_json["identifier"][0]["system"] + identifier_value = fhir_json["identifier"][0]["value"] + identifier = f"{identifier_system}#{identifier_value}" + + if identifier in array_of_identifiers: + identifier_already_present = True + delay_milliseconds = 30 + # A basic workaround by the existing team to ensure that subsequent operations e.g. an update after an + # initial create for the same item complete successfully. Consider using strongly consistent reads + # instead: VED-958 + time.sleep(delay_milliseconds / 1000) + else: + array_of_identifiers.append(identifier) + + imms_pk = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller) + logger.info("Successfully processed message. Local id: %s, PK: %s", local_id, imms_pk) + + except Exception as error: # pylint: disable = broad-exception-caught + filename_to_events_mapper.add_event( + { + **base_outgoing_message_body, + "operation_start_time": operation_start_time, + "operation_end_time": str(datetime.now()), + "diagnostics": create_diagnostics_dictionary(error), + } + ) + logger.error("Error processing message: %s", error) + + # Send to SQS + for filename_key, events in filename_to_events_mapper.get_map().items(): + sqs_message_body = json.dumps(events) + logger.info(f"total message length:{len(sqs_message_body)}") + + sqs_client.send_message( + QueueUrl=QUEUE_URL, + MessageBody=sqs_message_body, + MessageGroupId=filename_key, ) - logger.error("Error processing message: %s", error) - - # Send to SQS - for filename_key, events in filename_to_events_mapper.get_map().items(): - sqs_message_body = json.dumps(events) - logger.info(f"total message length:{len(sqs_message_body)}") - - sqs_client.send_message( - QueueUrl=QUEUE_URL, - MessageBody=sqs_message_body, - MessageGroupId=filename_key, - ) + except Exception as error: # pylint: disable = broad-exception-caught + logger.error("Error processing event: %s", error) if __name__ == "__main__": diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index ccb22e373..1f9c130af 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -863,6 +863,72 @@ def test_forward_request_to_dynamo( expected_values = test_case[0]["expected_values"] assert expected_values.items() <= call_data.items() + @patch("forwarding_batch_lambda.sqs_client.send_message") + def test_forward_lambda_handler_exception_handler(self, mock_send_message): + """Test exception handling when sqs_client fails""" + # Arrange + table_item = copy.deepcopy(ForwarderValues.EXPECTED_TABLE_ITEM) + table_item.update( + { + "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", + "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "Operation": "DELETE", + } + ) + + # Ensure there is at least one failure, so that sqs_client is called + test_cases = [ + { + "name": "Row 1: Create Success", + "input": self.generate_input( + row_id=1, + operation_requested="CREATE", + include_fhir_json=True, + identifier_value="RSV_CREATE", + ), + "expected_keys": ForwarderValues.EXPECTED_KEYS, + "expected_values": { + "row_id": "row-1", + **ForwarderValues.EXPECTED_VALUES, + }, + }, + { + "name": "Row 2: Duplication Error: Create failure ", + "input": self.generate_input(row_id=2, operation_requested="CREATE", include_fhir_json=True), + "expected_keys": ForwarderValues.EXPECTED_KEYS_DIAGNOSTICS, + "expected_values": { + "row_id": "row-2", + "diagnostics": create_diagnostics_dictionary( + IdentifierDuplicationError("https://www.ravs.england.nhs.uk/#RSV_002") + ), + }, + "is_failure": True, + }, + ] + + self.table.put_item( + Item={ + "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "PatientPK": "Patient#9732928395", # 9177036360", + "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", + "Version": 1, + } + ) + mock_send_message.reset_mock() + mock_send_message.side_effect = Exception("Unknown Exception in SQS client") + + event = self.generate_event(test_cases) + + self.mock_redis.hget.return_value = "RSV" + self.mock_redis_getter.return_value = self.mock_redis + + # Act & Assert + forward_lambda_handler(event, {}) + + self.mock_logger_error.assert_called_with("Error processing event: %s", ANY) + def clear_test_tables(self): """Clear DynamoDB table after each test.""" scan = self.table.scan() From ae5a00453e814bdb59026988e6d3505443854c7f Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 15:56:11 +0000 Subject: [PATCH 09/18] remove redundant code --- .../tests/test_forwarding_batch_lambda.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index 1f9c130af..63a9ad5ea 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -929,21 +929,6 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): self.mock_logger_error.assert_called_with("Error processing event: %s", ANY) - def clear_test_tables(self): - """Clear DynamoDB table after each test.""" - scan = self.table.scan() - items = scan.get("Items", []) - while items: - for item in items: - self.table.delete_item(Key={"PK": item["PK"]}) - scan = self.table.scan() - items = scan.get("Items", []) - - def teardown(self): - """Deletes mock dynamodb resource""" - self.table.delete() - self.dynamodb_resource = None - if __name__ == "__main__": unittest.main() From f175aa9b04100600003a95466de9813fb67ed100 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 15:57:50 +0000 Subject: [PATCH 10/18] moved From ddbff4ffce691a78c5a2b7a6f25763fba6a0e416 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 16:18:13 +0000 Subject: [PATCH 11/18] simplified test --- .../tests/test_forwarding_batch_lambda.py | 44 ++----------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index 63a9ad5ea..c9588487b 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -863,21 +863,10 @@ def test_forward_request_to_dynamo( expected_values = test_case[0]["expected_values"] assert expected_values.items() <= call_data.items() - @patch("forwarding_batch_lambda.sqs_client.send_message") - def test_forward_lambda_handler_exception_handler(self, mock_send_message): - """Test exception handling when sqs_client fails""" + @patch("forwarding_batch_lambda.create_table") + def test_forward_lambda_handler_exception_handler(self, mock_create_table): + """Test exception handling in main lambda handler loop""" # Arrange - table_item = copy.deepcopy(ForwarderValues.EXPECTED_TABLE_ITEM) - table_item.update( - { - "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", - "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "Operation": "DELETE", - } - ) - - # Ensure there is at least one failure, so that sqs_client is called test_cases = [ { "name": "Row 1: Create Success", @@ -893,37 +882,12 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): **ForwarderValues.EXPECTED_VALUES, }, }, - { - "name": "Row 2: Duplication Error: Create failure ", - "input": self.generate_input(row_id=2, operation_requested="CREATE", include_fhir_json=True), - "expected_keys": ForwarderValues.EXPECTED_KEYS_DIAGNOSTICS, - "expected_values": { - "row_id": "row-2", - "diagnostics": create_diagnostics_dictionary( - IdentifierDuplicationError("https://www.ravs.england.nhs.uk/#RSV_002") - ), - }, - "is_failure": True, - }, ] - self.table.put_item( - Item={ - "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "PatientPK": "Patient#9732928395", # 9177036360", - "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", - "Version": 1, - } - ) - mock_send_message.reset_mock() - mock_send_message.side_effect = Exception("Unknown Exception in SQS client") + mock_create_table.side_effect = Exception("Unknown Exception") event = self.generate_event(test_cases) - self.mock_redis.hget.return_value = "RSV" - self.mock_redis_getter.return_value = self.mock_redis - # Act & Assert forward_lambda_handler(event, {}) From 130dac19f1acada2a0915020d9d6ef53ec0f2da1 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 6 Jan 2026 17:54:57 +0000 Subject: [PATCH 12/18] test_delta --- lambdas/delta_backend/tests/test_delta.py | 24 ++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/lambdas/delta_backend/tests/test_delta.py b/lambdas/delta_backend/tests/test_delta.py index 94bef09ae..5d2e48cc4 100644 --- a/lambdas/delta_backend/tests/test_delta.py +++ b/lambdas/delta_backend/tests/test_delta.py @@ -2,7 +2,7 @@ import json import os import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, call, patch from botocore.exceptions import ClientError @@ -114,6 +114,28 @@ def test_handler_success_insert(self): self.assertEqual(put_item_data["SupplierSystem"], supplier) self.mock_sqs_client.send_message.assert_not_called() + def test_handler_exception(self): + """Ensure that sqs_client exceptions do not cause the lambda handler itself to raise an exception""" + + # Arrange + self.mock_sqs_client.send_message.side_effect = [ + None, + Exception("SQS error"), + ] + event = {"invalid_format": True} + + # Act + result = handler(event, None) + + # Assert + self.assertFalse(result) + self.mock_logger_exception.assert_has_calls( + [ + call("Delta Lambda failure: Incorrect invocation of Lambda"), + call("Error sending record to DLQ"), + ] + ) + def test_handler_overall_failure(self): # Arrange event = {"invalid_format": True} From fb32e3acc8b943f42bf293770537e1f9a60a1df6 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 09:34:48 +0000 Subject: [PATCH 13/18] try/catch on Send to SQS only, ensure we re-raise --- .../src/forwarding_batch_lambda.py | 137 +++++++++--------- .../tests/test_forwarding_batch_lambda.py | 48 +++++- 2 files changed, 112 insertions(+), 73 deletions(-) diff --git a/lambdas/recordforwarder/src/forwarding_batch_lambda.py b/lambdas/recordforwarder/src/forwarding_batch_lambda.py index d37be6a04..46ef91f1a 100644 --- a/lambdas/recordforwarder/src/forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/src/forwarding_batch_lambda.py @@ -69,75 +69,75 @@ def forward_request_to_dynamo( def forward_lambda_handler(event, _): - try: - """Forward each row to the Imms API""" - logger.info("Processing started") - table = create_table() - filename_to_events_mapper = BatchFilenameToEventsMapper() - array_of_identifiers = [] - controller = make_batch_controller() - - for record in event["Records"]: - operation_start_time = str(datetime.now()) - kinesis_payload = record["kinesis"]["data"] - decoded_payload = base64.b64decode(kinesis_payload).decode("utf-8") - incoming_message_body = json.loads(decoded_payload, use_decimal=True) - file_key = incoming_message_body.get("file_key") - local_id = incoming_message_body.get("local_id") - - if is_eof_message(incoming_message_body): - logger.info("Received EOF message for file key: %s", file_key) - filename_to_events_mapper.add_event(incoming_message_body) - continue - - base_outgoing_message_body = { - "file_key": incoming_message_body.get("file_key"), - "row_id": incoming_message_body.get("row_id"), - "created_at_formatted_string": incoming_message_body.get("created_at_formatted_string"), - "local_id": incoming_message_body.get("local_id"), - "operation_requested": incoming_message_body.get("operation_requested"), - "supplier": incoming_message_body.get("supplier"), - "vaccine_type": incoming_message_body.get("vax_type"), - } - logger.info("Received message for file %s with local id: %s", file_key, local_id) - - try: - if incoming_diagnostics := incoming_message_body.get("diagnostics"): - raise RecordProcessorError(incoming_diagnostics) - - if not (fhir_json := incoming_message_body.get("fhir_json")): - raise MessageNotSuccessfulError("Server error - FHIR JSON not correctly sent to forwarder") - - # Check if the identifier is already present in the array - identifier_already_present = False - identifier_system = fhir_json["identifier"][0]["system"] - identifier_value = fhir_json["identifier"][0]["value"] - identifier = f"{identifier_system}#{identifier_value}" - - if identifier in array_of_identifiers: - identifier_already_present = True - delay_milliseconds = 30 - # A basic workaround by the existing team to ensure that subsequent operations e.g. an update after an - # initial create for the same item complete successfully. Consider using strongly consistent reads - # instead: VED-958 - time.sleep(delay_milliseconds / 1000) - else: - array_of_identifiers.append(identifier) - - imms_pk = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller) - logger.info("Successfully processed message. Local id: %s, PK: %s", local_id, imms_pk) - - except Exception as error: # pylint: disable = broad-exception-caught - filename_to_events_mapper.add_event( - { - **base_outgoing_message_body, - "operation_start_time": operation_start_time, - "operation_end_time": str(datetime.now()), - "diagnostics": create_diagnostics_dictionary(error), - } - ) - logger.error("Error processing message: %s", error) + """Forward each row to the Imms API""" + logger.info("Processing started") + table = create_table() + filename_to_events_mapper = BatchFilenameToEventsMapper() + array_of_identifiers = [] + controller = make_batch_controller() + + for record in event["Records"]: + operation_start_time = str(datetime.now()) + kinesis_payload = record["kinesis"]["data"] + decoded_payload = base64.b64decode(kinesis_payload).decode("utf-8") + incoming_message_body = json.loads(decoded_payload, use_decimal=True) + file_key = incoming_message_body.get("file_key") + local_id = incoming_message_body.get("local_id") + + if is_eof_message(incoming_message_body): + logger.info("Received EOF message for file key: %s", file_key) + filename_to_events_mapper.add_event(incoming_message_body) + continue + + base_outgoing_message_body = { + "file_key": incoming_message_body.get("file_key"), + "row_id": incoming_message_body.get("row_id"), + "created_at_formatted_string": incoming_message_body.get("created_at_formatted_string"), + "local_id": incoming_message_body.get("local_id"), + "operation_requested": incoming_message_body.get("operation_requested"), + "supplier": incoming_message_body.get("supplier"), + "vaccine_type": incoming_message_body.get("vax_type"), + } + logger.info("Received message for file %s with local id: %s", file_key, local_id) + + try: + if incoming_diagnostics := incoming_message_body.get("diagnostics"): + raise RecordProcessorError(incoming_diagnostics) + + if not (fhir_json := incoming_message_body.get("fhir_json")): + raise MessageNotSuccessfulError("Server error - FHIR JSON not correctly sent to forwarder") + + # Check if the identifier is already present in the array + identifier_already_present = False + identifier_system = fhir_json["identifier"][0]["system"] + identifier_value = fhir_json["identifier"][0]["value"] + identifier = f"{identifier_system}#{identifier_value}" + + if identifier in array_of_identifiers: + identifier_already_present = True + delay_milliseconds = 30 + # A basic workaround by the existing team to ensure that subsequent operations e.g. an update after an + # initial create for the same item complete successfully. Consider using strongly consistent reads + # instead: VED-958 + time.sleep(delay_milliseconds / 1000) + else: + array_of_identifiers.append(identifier) + + imms_pk = forward_request_to_dynamo(incoming_message_body, table, identifier_already_present, controller) + logger.info("Successfully processed message. Local id: %s, PK: %s", local_id, imms_pk) + + except Exception as error: # pylint: disable = broad-exception-caught + filename_to_events_mapper.add_event( + { + **base_outgoing_message_body, + "operation_start_time": operation_start_time, + "operation_end_time": str(datetime.now()), + "diagnostics": create_diagnostics_dictionary(error), + } + ) + logger.error("Error processing message: %s", error) + try: # Send to SQS for filename_key, events in filename_to_events_mapper.get_map().items(): sqs_message_body = json.dumps(events) @@ -150,6 +150,7 @@ def forward_lambda_handler(event, _): ) except Exception as error: # pylint: disable = broad-exception-caught logger.error("Error processing event: %s", error) + raise if __name__ == "__main__": diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index c9588487b..31ac39b2e 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -863,10 +863,21 @@ def test_forward_request_to_dynamo( expected_values = test_case[0]["expected_values"] assert expected_values.items() <= call_data.items() - @patch("forwarding_batch_lambda.create_table") - def test_forward_lambda_handler_exception_handler(self, mock_create_table): - """Test exception handling in main lambda handler loop""" + @patch("forwarding_batch_lambda.sqs_client.send_message") + def test_forward_lambda_handler_exception_handler(self, mock_send_message): + """Test exception handling when sqs_client fails""" # Arrange + table_item = copy.deepcopy(ForwarderValues.EXPECTED_TABLE_ITEM) + table_item.update( + { + "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", + "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "Operation": "DELETE", + } + ) + + # Ensure there is at least one failure, so that sqs_client is called test_cases = [ { "name": "Row 1: Create Success", @@ -882,15 +893,42 @@ def test_forward_lambda_handler_exception_handler(self, mock_create_table): **ForwarderValues.EXPECTED_VALUES, }, }, + { + "name": "Row 2: Duplication Error: Create failure ", + "input": self.generate_input(row_id=2, operation_requested="CREATE", include_fhir_json=True), + "expected_keys": ForwarderValues.EXPECTED_KEYS_DIAGNOSTICS, + "expected_values": { + "row_id": "row-2", + "diagnostics": create_diagnostics_dictionary( + IdentifierDuplicationError("https://www.ravs.england.nhs.uk/#RSV_002") + ), + }, + "is_failure": True, + }, ] - mock_create_table.side_effect = Exception("Unknown Exception") + self.table.put_item( + Item={ + "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "PatientPK": "Patient#9732928395", # 9177036360", + "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", + "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", + "Version": 1, + } + ) + mock_send_message.reset_mock() + mock_send_message.side_effect = Exception("Unknown Exception in SQS client") event = self.generate_event(test_cases) + self.mock_redis.hget.return_value = "RSV" + self.mock_redis_getter.return_value = self.mock_redis + # Act & Assert - forward_lambda_handler(event, {}) + with self.assertRaises(Exception) as context: + forward_lambda_handler(event, {}) + self.assertIn("Unknown Exception in SQS client", str(context.exception)) self.mock_logger_error.assert_called_with("Error processing event: %s", ANY) From ac29bf900f99cb9aaa0f48e53fe0772d0042930d Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 09:36:48 +0000 Subject: [PATCH 14/18] cleanup --- lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index 31ac39b2e..067f3c611 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -916,7 +916,7 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): "Version": 1, } ) - mock_send_message.reset_mock() + mock_send_message.side_effect = Exception("Unknown Exception in SQS client") event = self.generate_event(test_cases) From acfb4c12c28c44a5d969473f5d4afaa62f768612 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 09:39:11 +0000 Subject: [PATCH 15/18] remove loop, retain test --- .../src/forwarding_batch_lambda.py | 24 ++++++++----------- .../tests/test_forwarding_batch_lambda.py | 1 - 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/lambdas/recordforwarder/src/forwarding_batch_lambda.py b/lambdas/recordforwarder/src/forwarding_batch_lambda.py index 46ef91f1a..2cc7253ca 100644 --- a/lambdas/recordforwarder/src/forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/src/forwarding_batch_lambda.py @@ -137,20 +137,16 @@ def forward_lambda_handler(event, _): ) logger.error("Error processing message: %s", error) - try: - # Send to SQS - for filename_key, events in filename_to_events_mapper.get_map().items(): - sqs_message_body = json.dumps(events) - logger.info(f"total message length:{len(sqs_message_body)}") - - sqs_client.send_message( - QueueUrl=QUEUE_URL, - MessageBody=sqs_message_body, - MessageGroupId=filename_key, - ) - except Exception as error: # pylint: disable = broad-exception-caught - logger.error("Error processing event: %s", error) - raise + # Send to SQS + for filename_key, events in filename_to_events_mapper.get_map().items(): + sqs_message_body = json.dumps(events) + logger.info(f"total message length:{len(sqs_message_body)}") + + sqs_client.send_message( + QueueUrl=QUEUE_URL, + MessageBody=sqs_message_body, + MessageGroupId=filename_key, + ) if __name__ == "__main__": diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index 067f3c611..b7d159d72 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -929,7 +929,6 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): forward_lambda_handler(event, {}) self.assertIn("Unknown Exception in SQS client", str(context.exception)) - self.mock_logger_error.assert_called_with("Error processing event: %s", ANY) if __name__ == "__main__": From 3e0b356c8272c4d38b7e244a795dacfd6e2c5a5e Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 10:01:23 +0000 Subject: [PATCH 16/18] bug fix: remove duplicate send_message() call on failure --- lambdas/delta_backend/src/delta.py | 1 - lambdas/delta_backend/tests/test_delta.py | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/lambdas/delta_backend/src/delta.py b/lambdas/delta_backend/src/delta.py index 9623a95a1..8379e59d0 100644 --- a/lambdas/delta_backend/src/delta.py +++ b/lambdas/delta_backend/src/delta.py @@ -232,7 +232,6 @@ def handler(event, _context): "diagnostics": "Delta Lambda failure: Incorrect invocation of Lambda", } logger.exception(operation_outcome["diagnostics"]) - send_message(event) # Send failed records to DLQ log_data = {"function_name": "delta_sync", "operation_outcome": operation_outcome} send_log_to_firehose(STREAM_NAME, log_data) diff --git a/lambdas/delta_backend/tests/test_delta.py b/lambdas/delta_backend/tests/test_delta.py index 5d2e48cc4..a09c6e8b6 100644 --- a/lambdas/delta_backend/tests/test_delta.py +++ b/lambdas/delta_backend/tests/test_delta.py @@ -118,10 +118,7 @@ def test_handler_exception(self): """Ensure that sqs_client exceptions do not cause the lambda handler itself to raise an exception""" # Arrange - self.mock_sqs_client.send_message.side_effect = [ - None, - Exception("SQS error"), - ] + self.mock_sqs_client.send_message.side_effect = Exception("SQS error") event = {"invalid_format": True} # Act From f5cd80a6a0b7a9e26ead16c07609b5efaaefacbb Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 11:58:45 +0000 Subject: [PATCH 17/18] review cleanup --- .../tests/test_forwarding_batch_lambda.py | 32 +++---------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index b7d159d72..80d40e8f2 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -867,38 +867,14 @@ def test_forward_request_to_dynamo( def test_forward_lambda_handler_exception_handler(self, mock_send_message): """Test exception handling when sqs_client fails""" # Arrange - table_item = copy.deepcopy(ForwarderValues.EXPECTED_TABLE_ITEM) - table_item.update( - { - "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", - "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "Operation": "DELETE", - } - ) - # Ensure there is at least one failure, so that sqs_client is called test_cases = [ { - "name": "Row 1: Create Success", - "input": self.generate_input( - row_id=1, - operation_requested="CREATE", - include_fhir_json=True, - identifier_value="RSV_CREATE", - ), - "expected_keys": ForwarderValues.EXPECTED_KEYS, - "expected_values": { - "row_id": "row-1", - **ForwarderValues.EXPECTED_VALUES, - }, - }, - { - "name": "Row 2: Duplication Error: Create failure ", - "input": self.generate_input(row_id=2, operation_requested="CREATE", include_fhir_json=True), + "name": "Row 1: Duplication Error: Create failure ", + "input": self.generate_input(row_id=1, operation_requested="CREATE", include_fhir_json=True), "expected_keys": ForwarderValues.EXPECTED_KEYS_DIAGNOSTICS, "expected_values": { - "row_id": "row-2", + "row_id": "row-1", "diagnostics": create_diagnostics_dictionary( IdentifierDuplicationError("https://www.ravs.england.nhs.uk/#RSV_002") ), @@ -928,7 +904,7 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): with self.assertRaises(Exception) as context: forward_lambda_handler(event, {}) - self.assertIn("Unknown Exception in SQS client", str(context.exception)) + self.assertEqual("Unknown Exception in SQS client", str(context.exception)) if __name__ == "__main__": From aed6059afab47059e97f554440bb20a504f3c7ed Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 7 Jan 2026 12:01:05 +0000 Subject: [PATCH 18/18] review cleanup II --- lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py index 80d40e8f2..dde50a6b2 100644 --- a/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py +++ b/lambdas/recordforwarder/tests/test_forwarding_batch_lambda.py @@ -886,7 +886,7 @@ def test_forward_lambda_handler_exception_handler(self, mock_send_message): self.table.put_item( Item={ "PK": "Immunization#4d2ac1eb-080f-4e54-9598-f2d53334681c", - "PatientPK": "Patient#9732928395", # 9177036360", + "PatientPK": "Patient#9732928395", "PatientSK": "RSV#4d2ac1eb-080f-4e54-9598-f2d53334681c", "IdentifierPK": "https://www.ravs.england.nhs.uk/#RSV_002", "Version": 1,