From d064e0dc1904e55f51b222edf31ee9fba36c154e Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Mon, 29 Dec 2025 15:59:41 +0000 Subject: [PATCH 1/5] Add DQ reported to update and create FHIR API endpoints --- infrastructure/instance/endpoints.tf | 7 +- lambdas/backend/src/service/fhir_service.py | 9 ++ .../tests/service/test_fhir_service.py | 130 ++++++++++++++++-- 3 files changed, 133 insertions(+), 13 deletions(-) diff --git a/infrastructure/instance/endpoints.tf b/infrastructure/instance/endpoints.tf index 968cf5dc5..9593c8012 100644 --- a/infrastructure/instance/endpoints.tf +++ b/infrastructure/instance/endpoints.tf @@ -25,9 +25,10 @@ locals { ] imms_table_name = aws_dynamodb_table.events-dynamodb-table.name imms_lambda_env_vars = { - "DYNAMODB_TABLE_NAME" = local.imms_table_name, - "IMMUNIZATION_ENV" = local.resource_scope, - "IMMUNIZATION_BASE_PATH" = strcontains(var.sub_environment, "pr-") ? "immunisation-fhir-api/FHIR/R4-${var.sub_environment}" : "immunisation-fhir-api/FHIR/R4" + "DATA_QUALITY_BUCKET_NAME" = aws_s3_bucket.data_quality_reports_bucket.id, + "DYNAMODB_TABLE_NAME" = local.imms_table_name, + "IMMUNIZATION_ENV" = local.resource_scope, + "IMMUNIZATION_BASE_PATH" = strcontains(var.sub_environment, "pr-") ? "immunisation-fhir-api/FHIR/R4-${var.sub_environment}" : "immunisation-fhir-api/FHIR/R4" # except for prod and ref, any other env uses PDS int environment "PDS_ENV" = var.pds_environment "SPLUNK_FIREHOSE_NAME" = module.splunk.firehose_stream_name diff --git a/lambdas/backend/src/service/fhir_service.py b/lambdas/backend/src/service/fhir_service.py index e6287068e..ac12cb311 100644 --- a/lambdas/backend/src/service/fhir_service.py +++ b/lambdas/backend/src/service/fhir_service.py @@ -21,6 +21,7 @@ from authorisation.api_operation_code import ApiOperationCode from authorisation.authoriser import Authoriser +from common.data_quality.reporter import DataQualityReporter from common.models.constants import Constants from common.models.errors import ( Code, @@ -52,8 +53,10 @@ IMMUNIZATION_BASE_PATH = os.getenv("IMMUNIZATION_BASE_PATH") IMMUNIZATION_ENV = os.getenv("IMMUNIZATION_ENV") +DATA_QUALITY_BUCKET_NAME = os.getenv("DATA_QUALITY_BUCKET_NAME") AUTHORISER = Authoriser() +DATA_QUALITY_REPORTER = DataQualityReporter(is_batch_csv=False, bucket=DATA_QUALITY_BUCKET_NAME) IMMUNIZATION_VALIDATOR = ImmunizationValidator() @@ -66,9 +69,11 @@ def __init__( self, imms_repo: ImmunizationRepository, authoriser: Authoriser = AUTHORISER, + data_quality_reporter: DataQualityReporter = DATA_QUALITY_REPORTER, validator: ImmunizationValidator = IMMUNIZATION_VALIDATOR, ): self.authoriser = authoriser + self.data_quality_reporter = data_quality_reporter self.immunization_repo = imms_repo self.validator = validator @@ -116,6 +121,8 @@ def create_immunization(self, immunization: dict, supplier_system: str) -> Id: if immunization.get("id") is not None: raise CustomValidationError("id field must not be present for CREATE operation") + self.data_quality_reporter.generate_and_send_report(immunization) + try: self.validator.validate(immunization) except (ValueError, MandatoryError) as error: @@ -138,6 +145,8 @@ def create_immunization(self, immunization: dict, supplier_system: str) -> Id: return self.immunization_repo.create_immunization(immunization_fhir_entity, supplier_system) def update_immunization(self, imms_id: str, immunization: dict, supplier_system: str, resource_version: int) -> int: + self.data_quality_reporter.generate_and_send_report(immunization) + try: self.validator.validate(immunization) except (ValueError, MandatoryError) as error: diff --git a/lambdas/backend/tests/service/test_fhir_service.py b/lambdas/backend/tests/service/test_fhir_service.py index 3f0865db0..6e84409f3 100644 --- a/lambdas/backend/tests/service/test_fhir_service.py +++ b/lambdas/backend/tests/service/test_fhir_service.py @@ -3,7 +3,7 @@ import os import unittest from copy import deepcopy -from unittest.mock import Mock, create_autospec, patch +from unittest.mock import ANY, Mock, create_autospec, patch from fhir.resources.R4B.bundle import BundleLink from fhir.resources.R4B.identifier import Identifier @@ -11,6 +11,7 @@ from authorisation.api_operation_code import ApiOperationCode from authorisation.authoriser import Authoriser +from common.data_quality.reporter import DataQualityReporter from common.models.errors import ( CustomValidationError, IdentifierDuplicationError, @@ -46,8 +47,8 @@ def setUp(self): self.mock_redis_getter = self.redis_getter_patcher.start() self.logger_info_patcher = patch("logging.Logger.info") self.mock_logger_info = self.logger_info_patcher.start() - self.imms_env_patcher = patch("service.fhir_service.IMMUNIZATION_ENV", "internal-dev") - self.imms_env_patcher.start() + imms_env_patcher = patch("service.fhir_service.IMMUNIZATION_ENV", "internal-dev") + imms_env_patcher.start() def tearDown(self): super().tearDown() @@ -60,9 +61,10 @@ class TestGetImmunization(TestFhirServiceBase): def setUp(self): super().setUp() self.authoriser = create_autospec(Authoriser) + self.data_quality_reporter = create_autospec(DataQualityReporter) self.imms_repo = create_autospec(ImmunizationRepository) self.validator = create_autospec(ImmunizationValidator) - self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator) + self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator) self.logger_info_patcher = patch("logging.Logger.info") self.mock_logger_info = self.logger_info_patcher.start() @@ -181,8 +183,9 @@ def setUp(self): super().setUp() self.authoriser = create_autospec(Authoriser) self.imms_repo = create_autospec(ImmunizationRepository) + self.data_quality_reporter = create_autospec(DataQualityReporter) self.validator = create_autospec(ImmunizationValidator) - self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator) + self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator) self.logger_info_patcher = patch("logging.Logger.info") self.mock_logger_info = self.logger_info_patcher.start() @@ -297,12 +300,14 @@ class TestCreateImmunization(TestFhirServiceBase): def setUp(self): super().setUp() self.authoriser = create_autospec(Authoriser) + self.data_quality_reporter = create_autospec(DataQualityReporter) self.imms_repo = create_autospec(ImmunizationRepository) self.validator = create_autospec(ImmunizationValidator) - self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator) + self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator) self.pre_validate_fhir_service = FhirService( self.imms_repo, self.authoriser, + self.data_quality_reporter, ImmunizationValidator(add_post_validators=False), ) @@ -322,6 +327,7 @@ def test_create_immunization(self): # Then self.authoriser.authorise.assert_called_once_with("Test", ApiOperationCode.CREATE, {"COVID"}) + self.data_quality_reporter.generate_and_send_report.assert_called_once_with(req_imms) self.imms_repo.check_immunization_identifier_exists.assert_called_once_with( "https://supplierABC/identifiers/vacc", "ACME-vacc123456" ) @@ -330,6 +336,48 @@ def test_create_immunization(self): self.validator.validate.assert_called_once_with(req_imms) self.assertEqual(self._MOCK_NEW_UUID, created_id) + @patch("common.data_quality.reporter.get_s3_client") + @patch("common.data_quality.reporter.uuid.uuid4", return_value="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8") + def test_create_immunization_submits_data_quality_report(self, _, mock_s3_client): + """it should create a data quality report when a new immunisation is payload is submitted - case with two + DQ warnings for DOSE_UNIT_CODE""" + self.mock_redis.hget.return_value = "COVID" + self.mock_redis_getter.return_value = self.mock_redis + self.authoriser.authorise.return_value = True + self.imms_repo.check_immunization_identifier_exists.return_value = False + self.imms_repo.create_immunization.return_value = self._MOCK_NEW_UUID + + dq_reporter = DataQualityReporter(is_batch_csv=False, bucket="test-dq-bucket-name") + fhir_service_with_dq = FhirService(self.imms_repo, self.authoriser, dq_reporter, self.validator) + + nhs_number = VALID_NHS_NUMBER + req_imms = create_covid_immunization_dict_no_id(nhs_number) + + # When + created_id = fhir_service_with_dq.create_immunization(req_imms, "Test") + + # Then + mock_s3_client.assert_called_once() + mock_s3_client().put_object.assert_called_once_with( + Bucket="test-dq-bucket-name", + Key="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8.json", + Body=ANY, + ContentType="application/json", + ) + _, kwargs = mock_s3_client().put_object.call_args_list[0] + self.assertEqual( + json.loads(kwargs["Body"]), + { + "data_quality_report_id": "2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8", + "validation_date": ANY, + "completeness": {"mandatory_fields": [], "optional_fields": [], "required_fields": ["DOSE_UNIT_CODE"]}, + "validity": ["DOSE_UNIT_CODE"], + "timeliness_ingested_seconds": ANY, + "timeliness_recorded_days": 0, + }, + ) + self.assertEqual(self._MOCK_NEW_UUID, created_id) + def test_create_immunization_with_id_throws_error(self): """it should throw exception if id present in create Immunization""" imms = create_covid_immunization_dict("an-id", "9990548609") @@ -370,7 +418,7 @@ def test_post_validation_failed_create_invalid_target_disease(self): + ".code - ['bad-code'] is not a valid combination of disease codes for this service" ) - fhir_service = FhirService(self.imms_repo) + fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter) with self.assertRaises(CustomValidationError) as error: fhir_service.create_immunization(bad_target_disease_imms, "Test") @@ -388,7 +436,7 @@ def test_post_validation_failed_create_missing_patient_name(self): del bad_patient_name_imms["contained"][1]["name"][0]["given"] bad_patient_name_msg = "contained[?(@.resourceType=='Patient')].name[0].given is a mandatory field" - fhir_service = FhirService(self.imms_repo) + fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter) with self.assertRaises(CustomValidationError) as error: fhir_service.create_immunization(bad_patient_name_imms, "Test") @@ -464,8 +512,9 @@ class TestUpdateImmunization(TestFhirServiceBase): def setUp(self): super().setUp() self.authoriser = create_autospec(Authoriser) + self.data_quality_reporter = create_autospec(DataQualityReporter) self.imms_repo = create_autospec(ImmunizationRepository) - self.fhir_service = FhirService(self.imms_repo, self.authoriser) + self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter) self.mock_redis.hget.return_value = "COVID" self.mock_redis_getter.return_value = self.mock_redis @@ -494,6 +543,7 @@ def test_update_immunization(self): # Then self.assertEqual(updated_version, 2) + self.data_quality_reporter.generate_and_send_report.assert_called_once_with(updated_immunisation) self.imms_repo.get_immunization_resource_and_metadata_by_id.assert_called_once_with( imms_id, include_deleted=True ) @@ -502,6 +552,65 @@ def test_update_immunization(self): ) self.authoriser.authorise.assert_called_once_with("Test", ApiOperationCode.UPDATE, {"COVID"}) + @patch("common.data_quality.reporter.get_s3_client") + @patch("common.data_quality.reporter.uuid.uuid4", return_value="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8") + def test_update_immunization_submits_data_quality_report(self, _, mock_s3_client): + """it should create a data quality report for every submitted immunization update - case where there are no + warnings in the report""" + imms_id = "an-id" + millilitres_snomed_code = "258773002" + snomed_coding_system = "http://snomed.info/sct" + original_immunisation = create_covid_immunization_dict(imms_id, VALID_NHS_NUMBER) + + # Could fix in future. Example payload passes validation but coding is incorrect + original_immunisation["doseQuantity"]["code"] = millilitres_snomed_code + original_immunisation["doseQuantity"]["system"] = snomed_coding_system + identifier = Identifier( + system=original_immunisation["identifier"][0]["system"], + value=original_immunisation["identifier"][0]["value"], + ) + updated_immunisation = create_covid_immunization_dict(imms_id, VALID_NHS_NUMBER, "2021-02-07T13:28:00+00:00") + updated_immunisation["doseQuantity"]["code"] = millilitres_snomed_code + updated_immunisation["doseQuantity"]["system"] = snomed_coding_system + existing_resource_meta = ImmunizationRecordMetadata( + identifier=identifier, resource_version=1, is_deleted=False, is_reinstated=False + ) + + self.imms_repo.get_immunization_resource_and_metadata_by_id.return_value = ( + original_immunisation, + existing_resource_meta, + ) + self.imms_repo.update_immunization.return_value = 2 + self.authoriser.authorise.return_value = True + + dq_reporter = DataQualityReporter(is_batch_csv=False, bucket="test-dq-bucket-name") + fhir_service_with_dq = FhirService(self.imms_repo, self.authoriser, dq_reporter) + + # When + updated_version = fhir_service_with_dq.update_immunization(imms_id, updated_immunisation, "Test", 1) + + # Then + mock_s3_client.assert_called_once() + mock_s3_client().put_object.assert_called_once_with( + Bucket="test-dq-bucket-name", + Key="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8.json", + Body=ANY, + ContentType="application/json", + ) + _, kwargs = mock_s3_client().put_object.call_args_list[0] + self.assertEqual( + json.loads(kwargs["Body"]), + { + "data_quality_report_id": "2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8", + "validation_date": ANY, + "completeness": {"mandatory_fields": [], "optional_fields": [], "required_fields": []}, + "validity": [], + "timeliness_ingested_seconds": ANY, + "timeliness_recorded_days": 0, + }, + ) + self.assertEqual(updated_version, 2) + def test_update_immunization_raises_validation_exception_when_nhs_number_invalid(self): """it should raise a CustomValidationError when the patient's NHS number in the payload is invalid""" imms_id = "an-id" @@ -633,9 +742,10 @@ class TestDeleteImmunization(TestFhirServiceBase): def setUp(self): super().setUp() self.authoriser = create_autospec(Authoriser) + self.data_quality_reporter = create_autospec(DataQualityReporter) self.imms_repo = create_autospec(ImmunizationRepository) self.validator = create_autospec(ImmunizationValidator) - self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator) + self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator) def test_delete_immunization(self): """it should delete Immunization record""" From 7f662693784d60d559a5b98b807a0147bac32160 Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Mon, 29 Dec 2025 16:37:44 +0000 Subject: [PATCH 2/5] Add DQ reporting to batch processor --- .../instance/ecs_batch_processor_config.tf | 4 +++ infrastructure/instance/endpoints.tf | 2 +- .../recordprocessor/src/batch_processor.py | 8 ++++++ lambdas/recordprocessor/src/constants.py | 1 + .../tests/test_recordprocessor_edge_cases.py | 1 + .../tests/test_recordprocessor_main.py | 27 +++++++++++++++++-- .../mock_environment_variables.py | 2 ++ .../utils_for_recordprocessor_tests.py | 3 ++- .../values_for_recordprocessor_tests.py | 8 +++--- 9 files changed, 48 insertions(+), 8 deletions(-) diff --git a/infrastructure/instance/ecs_batch_processor_config.tf b/infrastructure/instance/ecs_batch_processor_config.tf index 3f2bde38d..a67237a54 100644 --- a/infrastructure/instance/ecs_batch_processor_config.tf +++ b/infrastructure/instance/ecs_batch_processor_config.tf @@ -207,6 +207,10 @@ resource "aws_ecs_task_definition" "ecs_task" { name = "ACK_BUCKET_NAME" value = aws_s3_bucket.batch_data_destination_bucket.bucket }, + { + name = "DATA_QUALITY_BUCKET_NAME" + value = aws_s3_bucket.data_quality_reports_bucket.bucket + }, { name = "KINESIS_STREAM_ARN" value = local.kinesis_arn diff --git a/infrastructure/instance/endpoints.tf b/infrastructure/instance/endpoints.tf index 9593c8012..e95d005bd 100644 --- a/infrastructure/instance/endpoints.tf +++ b/infrastructure/instance/endpoints.tf @@ -25,7 +25,7 @@ locals { ] imms_table_name = aws_dynamodb_table.events-dynamodb-table.name imms_lambda_env_vars = { - "DATA_QUALITY_BUCKET_NAME" = aws_s3_bucket.data_quality_reports_bucket.id, + "DATA_QUALITY_BUCKET_NAME" = aws_s3_bucket.data_quality_reports_bucket.bucket, "DYNAMODB_TABLE_NAME" = local.imms_table_name, "IMMUNIZATION_ENV" = local.resource_scope, "IMMUNIZATION_BASE_PATH" = strcontains(var.sub_environment, "pr-") ? "immunisation-fhir-api/FHIR/R4-${var.sub_environment}" : "immunisation-fhir-api/FHIR/R4" diff --git a/lambdas/recordprocessor/src/batch_processor.py b/lambdas/recordprocessor/src/batch_processor.py index ac9aa60cd..3277a31cc 100644 --- a/lambdas/recordprocessor/src/batch_processor.py +++ b/lambdas/recordprocessor/src/batch_processor.py @@ -11,8 +11,10 @@ from common.aws_s3_utils import move_file from common.batch.eof_utils import make_batch_eof_message from common.clients import logger +from common.data_quality.reporter import DataQualityReporter from constants import ( ARCHIVE_DIR_NAME, + DATA_QUALITY_BUCKET_NAME, PROCESSING_DIR_NAME, SOURCE_BUCKET_NAME, FileNotProcessedReason, @@ -122,8 +124,10 @@ def process_rows( """ Processes each row in the csv_reader starting from start_row. """ + data_quality_reporter = DataQualityReporter(is_batch_csv=True, bucket=DATA_QUALITY_BUCKET_NAME) row_count = 0 start_row = total_rows_processed_count + try: for row in csv_reader: row_count += 1 @@ -135,6 +139,10 @@ def process_rows( logger.info(f"Process: {total_rows_processed_count + 1}") if start_row > 0 and row_count <= start_row + 10: logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count + 1}") + + # Submit data quality report + data_quality_reporter.generate_and_send_report(row) + # Process the row to obtain the details needed for the message_body and ack file details_from_processing = process_row(target_disease, allowed_operations, row) # Create the message body for sending diff --git a/lambdas/recordprocessor/src/constants.py b/lambdas/recordprocessor/src/constants.py index d3a28aa88..a3d82ff03 100644 --- a/lambdas/recordprocessor/src/constants.py +++ b/lambdas/recordprocessor/src/constants.py @@ -8,6 +8,7 @@ SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME") ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME") AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME") +DATA_QUALITY_BUCKET_NAME = os.getenv("DATA_QUALITY_BUCKET_NAME") ARCHIVE_DIR_NAME = "archive" PROCESSING_DIR_NAME = "processing" diff --git a/lambdas/recordprocessor/tests/test_recordprocessor_edge_cases.py b/lambdas/recordprocessor/tests/test_recordprocessor_edge_cases.py index 0abd8b2ec..bacfe0fbb 100644 --- a/lambdas/recordprocessor/tests/test_recordprocessor_edge_cases.py +++ b/lambdas/recordprocessor/tests/test_recordprocessor_edge_cases.py @@ -28,6 +28,7 @@ def setUp(self): self.mock_set_audit_table_ingestion_start_time = create_patch( "file_level_validation.set_audit_table_ingestion_start_time" ) + create_patch("batch_processor.DataQualityReporter") def tearDown(self): patch.stopall() diff --git a/lambdas/recordprocessor/tests/test_recordprocessor_main.py b/lambdas/recordprocessor/tests/test_recordprocessor_main.py index 9e935ea3e..dff069c15 100644 --- a/lambdas/recordprocessor/tests/test_recordprocessor_main.py +++ b/lambdas/recordprocessor/tests/test_recordprocessor_main.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta, timezone from decimal import Decimal from json import JSONDecodeError -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch from boto3 import client as boto3_client from moto import mock_dynamodb, mock_firehose, mock_kinesis, mock_s3 @@ -118,6 +118,28 @@ def get_ack_file_content(file_key: str) -> str: response = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=file_key) return response["Body"].read().decode("utf-8") + def make_data_quality_assertions(self, expected_number_of_files: int) -> None: + """Asserts that the expected data quality reports were submitted""" + dq_reports = s3_client.list_objects_v2(Bucket=BucketNames.DATA_QUALITY).get("Contents", []) + self.assertEqual(len(dq_reports), expected_number_of_files) + + for report in dq_reports: + content = s3_client.get_object(Bucket=BucketNames.DATA_QUALITY, Key=report.get("Key")) + dq_report_dict = json.loads(content["Body"].read().decode("utf-8")) + self.maxDiff = None + + self.assertEqual( + dq_report_dict, + { + "data_quality_report_id": ANY, + "validation_date": ANY, + "completeness": {"mandatory_fields": [], "optional_fields": [], "required_fields": []}, + "validity": [], + "timeliness_ingested_seconds": ANY, + "timeliness_recorded_days": ANY, + }, + ) + def make_eof_message_assertion(self, file_details: FileDetails, actual_msg: str, total_records: int) -> None: self.assertDictEqual( { @@ -216,7 +238,7 @@ def test_e2e_full_permissions(self): main(test_file.event_full_permissions) - # Assertion case tuples are stuctured as + # Assertion case tuples are structured as # (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success) assertion_cases = [ ( @@ -249,6 +271,7 @@ def test_e2e_full_permissions(self): ] self.make_inf_ack_assertions(file_details=mock_rsv_emis_file, passed_validation=True) self.make_kinesis_assertions(assertion_cases) + self.make_data_quality_assertions(len(assertion_cases)) assert_audit_table_entry(test_file, FileStatus.PREPROCESSED, row_count=3) def test_e2e_partial_permissions(self): diff --git a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py index 677399031..308d8f6f9 100644 --- a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py +++ b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/mock_environment_variables.py @@ -7,6 +7,7 @@ class BucketNames: """Class containing bucket names for use in tests""" SOURCE = "immunisation-batch-internal-dev-data-sources" + DATA_QUALITY = "immunisation-data-quality-test-bucket-name" DESTINATION = "immunisation-batch-internal-dev-data-destinations" MOCK_FIREHOSE = "mock-firehose-bucket" @@ -36,6 +37,7 @@ class Sqs: "LOCAL_ACCOUNT_ID": "123456789012", "SOURCE_BUCKET_NAME": BucketNames.SOURCE, "ACK_BUCKET_NAME": BucketNames.DESTINATION, + "DATA_QUALITY_BUCKET_NAME": BucketNames.DATA_QUALITY, "SHORT_QUEUE_PREFIX": "imms-batch-internal-dev", "SPLUNK_FIREHOSE_NAME": Firehose.STREAM_NAME, "KINESIS_STREAM_NAME": Kinesis.STREAM_NAME, diff --git a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py index e1279ae8e..5c27fb65a 100644 --- a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py +++ b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/utils_for_recordprocessor_tests.py @@ -62,6 +62,7 @@ def __init__( if s3_client: for bucket_name in [ BucketNames.SOURCE, + BucketNames.DATA_QUALITY, BucketNames.DESTINATION, BucketNames.MOCK_FIREHOSE, ]: @@ -104,7 +105,7 @@ def __init__( dynamo_db_client=None, ): if s3_client: - for bucket_name in [BucketNames.SOURCE, BucketNames.DESTINATION]: + for bucket_name in [BucketNames.SOURCE, BucketNames.DATA_QUALITY, BucketNames.DESTINATION]: for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []): s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) s3_client.delete_bucket(Bucket=bucket_name) diff --git a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py index 8038bcc36..8361c648a 100644 --- a/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py +++ b/lambdas/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py @@ -85,8 +85,8 @@ class MockFileRows: '"1303503001"|"Administration of vaccine product containing only Human orthopneumovirus antigen (procedure)"|' '1|"42605811000001109"|"Abrysvo vaccine powder and solvent for solution for injection 0.5ml vials (Pfizer Ltd) ' '(product)"|"Pfizer"|"RSVTEST"|"20241231"|"368208006"|"Left upper arm structure (body structure)"|' - '"78421000"|"Intramuscular route (qualifier value)"|"0.5"|"258773002"|"Milliliter (qualifier value)"|"Test"|' - '"J82067"|"https://fhir.nhs.uk/Id/ods-organization-code"' + '"78421000"|"Intramuscular route (qualifier value)"|"0.5"|"258773002"|"Milliliter (qualifier value)"' + '|"443684005"|"J82067"|"https://fhir.nhs.uk/Id/ods-organization-code"' ) UPDATE = ( @@ -98,7 +98,7 @@ class MockFileRows: '"Comirnaty 0.3ml dose concentrate for dispersion for injection multidose vials (Pfizer/BioNTech) ' '(product)"|"Pfizer/BioNTech"|"COVIDBATCH"|"20250101"|"368208007"|"Right upper arm structure (body structure)"|' '"385219009"|"Intramuscular route (qualifier value)"|' - '"0.3"|"258773002"|"Milliliter (qualifier value)"|"Routine"|' + '"0.3"|"258773002"|"Milliliter (qualifier value)"|"443684005"|' '"J82068"|"https://fhir.nhs.uk/Id/ods-organization-code"' ) @@ -111,7 +111,7 @@ class MockFileRows: '"Comirnaty 0.3ml dose concentrate for dispersion for injection multidose vials (Pfizer/BioNTech) ' '(product)"|"Pfizer/BioNTech"|"COVIDBATCH"|"20250101"|"368208007"|"Right upper arm structure (body structure)"|' '"385219009"|"Intramuscular route (qualifier value)"|' - '"0.3"|"258773002"|"Milliliter (qualifier value)"|"Routine"|' + '"0.3"|"258773002"|"Milliliter (qualifier value)"|"443684005"|' '"J82068"|"https://fhir.nhs.uk/Id/ods-organization-code"' ) From 9cbdad35b45b7488158e1393da3967cc2fd5b981 Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Tue, 30 Dec 2025 10:17:19 +0000 Subject: [PATCH 3/5] Tighten datetime validation --- .../models/immunization_batch_row_model.py | 22 ++++++++++++++--- .../test_common/data_quality/test_checker.py | 24 +++++++++++-------- .../test_common/data_quality/test_reporter.py | 19 ++++++++------- 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/lambdas/shared/src/common/data_quality/models/immunization_batch_row_model.py b/lambdas/shared/src/common/data_quality/models/immunization_batch_row_model.py index 662ba7a9d..f63b0aa4f 100644 --- a/lambdas/shared/src/common/data_quality/models/immunization_batch_row_model.py +++ b/lambdas/shared/src/common/data_quality/models/immunization_batch_row_model.py @@ -29,7 +29,7 @@ class ImmunizationBatchRowModel(BaseModel): NHS_NUMBER: NhsNumber PERSON_DOB: BatchCsvDate - DATE_AND_TIME: BatchCsvDate + DATE_AND_TIME: datetime.datetime PERSON_POSTCODE: PersonPostcode EXPIRY_DATE: ExpiryDate # TODO - check with DQ team. Should these checks be relative to the occurrence datetime? DOSE_AMOUNT: decimal.Decimal # TODO - check with DQ team. Actual values vary a lot from proposed enum. @@ -42,13 +42,29 @@ class ImmunizationBatchRowModel(BaseModel): def parse_csv_date(cls, value: str) -> datetime.date: return parse_csv_date(value) - @validator("PERSON_DOB", "DATE_AND_TIME") - def ensure_past_date(cls, value: datetime.date) -> datetime.date: + @validator("PERSON_DOB") + def is_past_date(cls, value: datetime.date) -> datetime.date: if value >= datetime.date.today(): raise ValueError("Date must be in the past") return value + @validator("DATE_AND_TIME") + def is_past_datetime(cls, value: datetime.datetime) -> datetime.datetime: + if value >= datetime.datetime.now(datetime.timezone.utc): + raise ValueError("Datetime must be in the past") + + return value + + @validator("DATE_AND_TIME") + def is_on_or_after_min_date(cls, value: datetime.datetime) -> datetime.date: + """Checks that the given datetime is on a date on or after the minimum accepted date. Note: we have to do this + manually in Pydantic 1 as it only supports condate, so we cannot use built in ge, le comparators.""" + if value.date() < MIN_ACCEPTED_PAST_DATE: + raise ValueError("Datetime is before the minimum accepted date") + + return value + @validator("DATE_AND_TIME", pre=True) def parse_csv_datetime(cls, value: str) -> datetime.datetime: return parse_csv_datetime(value) diff --git a/lambdas/shared/tests/test_common/data_quality/test_checker.py b/lambdas/shared/tests/test_common/data_quality/test_checker.py index 96ed18884..bc7fde81a 100644 --- a/lambdas/shared/tests/test_common/data_quality/test_checker.py +++ b/lambdas/shared/tests/test_common/data_quality/test_checker.py @@ -9,16 +9,20 @@ class TestDataQualityChecker(unittest.TestCase): def setUp(self): - # Fix date.today() for all validation tests - date_today_patcher = patch("common.data_quality.models.immunization_batch_row_model.datetime", wraps=datetime) - self.mock_date_today = date_today_patcher.start() - self.mock_date_today.date.today.return_value = datetime.date(2024, 5, 20) - - # Fix datetime.now self.mock_fixed_datetime = datetime.datetime(2024, 5, 20, 14, 12, 30, 123, tzinfo=datetime.timezone.utc) - datetime_now_patcher = patch("common.data_quality.checker.datetime", wraps=datetime.datetime) - self.mock_datetime_now = datetime_now_patcher.start() - self.mock_datetime_now.now.return_value = self.mock_fixed_datetime + + # Fix date.today() and datetime.now() for the validation model + validator_datetime_patcher = patch( + "common.data_quality.models.immunization_batch_row_model.datetime", wraps=datetime + ) + mock_validator_datetime = validator_datetime_patcher.start() + mock_validator_datetime.date.today.return_value = datetime.date(2024, 5, 20) + mock_validator_datetime.datetime.now.return_value = self.mock_fixed_datetime + + # Fix datetime.now in the top level checker + dq_checker_datetime_patcher = patch("common.data_quality.checker.datetime", wraps=datetime.datetime) + mock_dq_checker_datetime = dq_checker_datetime_patcher.start() + mock_dq_checker_datetime.now.return_value = self.mock_fixed_datetime self.batch_dq_checker = DataQualityChecker(is_batch_csv=True) self.fhir_json_dq_checker = DataQualityChecker(is_batch_csv=False) @@ -48,7 +52,7 @@ def test_check_validity_returns_list_of_invalid_fields_when_invalid_data_provide ("DATE_AND_TIME", "17000511T120000"), # Prior to min accepted past date ("DATE_AND_TIME", "20241511T120000"), # Invalid datetime ("DATE_AND_TIME", "20241511T120"), # Invalid datetime - ("DATE_AND_TIME", "20240520T120001"), # Past dates only + ("DATE_AND_TIME", "20240520T150001"), # Past datetime only ("PERSON_POSTCODE", "AAA12 3B"), ("EXPIRY_DATE", "18990101"), # Prior to min accepted past date ("EXPIRY_DATE", "20240137"), # Invalid date diff --git a/lambdas/shared/tests/test_common/data_quality/test_reporter.py b/lambdas/shared/tests/test_common/data_quality/test_reporter.py index bdcb9d6b5..dfb695194 100644 --- a/lambdas/shared/tests/test_common/data_quality/test_reporter.py +++ b/lambdas/shared/tests/test_common/data_quality/test_reporter.py @@ -15,17 +15,18 @@ @mock_aws class TestDataQualityReporter(unittest.TestCase): def setUp(self): - # Fix date.today() in validator model - date_today_patcher = patch("common.data_quality.models.immunization_batch_row_model.datetime", wraps=datetime) - mock_date_today = date_today_patcher.start() - mock_date_today.date.today.return_value = datetime.date(2024, 5, 20) + fixed_datetime_now = datetime.datetime(2024, 5, 20, 14, 12, 30, 123, tzinfo=datetime.timezone.utc) + + # Fix date.today() and datetime.now() in validator model + validator_dt_patcher = patch("common.data_quality.models.immunization_batch_row_model.datetime", wraps=datetime) + mock_validator_dt = validator_dt_patcher.start() + mock_validator_dt.date.today.return_value = datetime.date(2024, 5, 20) + mock_validator_dt.datetime.now.return_value = fixed_datetime_now # Fix datetime.now() in dq checker to fix the report datetime - datetime_now_patcher = patch("common.data_quality.checker.datetime", wraps=datetime.datetime) - mock_datetime_now = datetime_now_patcher.start() - mock_datetime_now.now.return_value = datetime.datetime( - 2024, 5, 20, 14, 12, 30, 123, tzinfo=datetime.timezone.utc - ) + checker_dt_patcher = patch("common.data_quality.checker.datetime", wraps=datetime.datetime) + mock_checker_dt = checker_dt_patcher.start() + mock_checker_dt.now.return_value = fixed_datetime_now # Fix generated UUID self.example_uuid = uuid.UUID("fa711f35-c08b-48c8-b498-3b151e686ddf") From f9b3cfa75b346c837249df6193c4ef6ecbecb1c3 Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Mon, 5 Jan 2026 09:21:12 +0000 Subject: [PATCH 4/5] Resolve pr comments --- lambdas/backend/src/service/fhir_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambdas/backend/src/service/fhir_service.py b/lambdas/backend/src/service/fhir_service.py index ac12cb311..52b514202 100644 --- a/lambdas/backend/src/service/fhir_service.py +++ b/lambdas/backend/src/service/fhir_service.py @@ -118,11 +118,11 @@ def get_immunization_and_version_by_id(self, imms_id: str, supplier_system: str) return Immunization.parse_obj(resource), str(immunization_metadata.resource_version) def create_immunization(self, immunization: dict, supplier_system: str) -> Id: + self.data_quality_reporter.generate_and_send_report(immunization) + if immunization.get("id") is not None: raise CustomValidationError("id field must not be present for CREATE operation") - self.data_quality_reporter.generate_and_send_report(immunization) - try: self.validator.validate(immunization) except (ValueError, MandatoryError) as error: From d0373723acaa1340ba5986305fe7b59e930e7ce7 Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Mon, 5 Jan 2026 10:19:08 +0000 Subject: [PATCH 5/5] Further review comment! --- lambdas/recordprocessor/tests/test_recordprocessor_main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lambdas/recordprocessor/tests/test_recordprocessor_main.py b/lambdas/recordprocessor/tests/test_recordprocessor_main.py index dff069c15..bb07b49c4 100644 --- a/lambdas/recordprocessor/tests/test_recordprocessor_main.py +++ b/lambdas/recordprocessor/tests/test_recordprocessor_main.py @@ -126,7 +126,6 @@ def make_data_quality_assertions(self, expected_number_of_files: int) -> None: for report in dq_reports: content = s3_client.get_object(Bucket=BucketNames.DATA_QUALITY, Key=report.get("Key")) dq_report_dict = json.loads(content["Body"].read().decode("utf-8")) - self.maxDiff = None self.assertEqual( dq_report_dict,