Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions infrastructure/instance/ecs_batch_processor_config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ resource "aws_ecs_task_definition" "ecs_task" {
name = "ACK_BUCKET_NAME"
value = aws_s3_bucket.batch_data_destination_bucket.bucket
},
{
name = "DATA_QUALITY_BUCKET_NAME"
value = aws_s3_bucket.data_quality_reports_bucket.bucket
},
{
name = "KINESIS_STREAM_ARN"
value = local.kinesis_arn
Expand Down
7 changes: 4 additions & 3 deletions infrastructure/instance/endpoints.tf
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ locals {
]
imms_table_name = aws_dynamodb_table.events-dynamodb-table.name
imms_lambda_env_vars = {
"DYNAMODB_TABLE_NAME" = local.imms_table_name,
"IMMUNIZATION_ENV" = local.resource_scope,
"IMMUNIZATION_BASE_PATH" = strcontains(var.sub_environment, "pr-") ? "immunisation-fhir-api/FHIR/R4-${var.sub_environment}" : "immunisation-fhir-api/FHIR/R4"
"DATA_QUALITY_BUCKET_NAME" = aws_s3_bucket.data_quality_reports_bucket.bucket,
"DYNAMODB_TABLE_NAME" = local.imms_table_name,
"IMMUNIZATION_ENV" = local.resource_scope,
"IMMUNIZATION_BASE_PATH" = strcontains(var.sub_environment, "pr-") ? "immunisation-fhir-api/FHIR/R4-${var.sub_environment}" : "immunisation-fhir-api/FHIR/R4"
# except for prod and ref, any other env uses PDS int environment
"PDS_ENV" = var.pds_environment
"SPLUNK_FIREHOSE_NAME" = module.splunk.firehose_stream_name
Expand Down
9 changes: 9 additions & 0 deletions lambdas/backend/src/service/fhir_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from authorisation.api_operation_code import ApiOperationCode
from authorisation.authoriser import Authoriser
from common.data_quality.reporter import DataQualityReporter
from common.models.constants import Constants
from common.models.errors import (
Code,
Expand Down Expand Up @@ -52,8 +53,10 @@

IMMUNIZATION_BASE_PATH = os.getenv("IMMUNIZATION_BASE_PATH")
IMMUNIZATION_ENV = os.getenv("IMMUNIZATION_ENV")
DATA_QUALITY_BUCKET_NAME = os.getenv("DATA_QUALITY_BUCKET_NAME")

AUTHORISER = Authoriser()
DATA_QUALITY_REPORTER = DataQualityReporter(is_batch_csv=False, bucket=DATA_QUALITY_BUCKET_NAME)
IMMUNIZATION_VALIDATOR = ImmunizationValidator()


Expand All @@ -66,9 +69,11 @@ def __init__(
self,
imms_repo: ImmunizationRepository,
authoriser: Authoriser = AUTHORISER,
data_quality_reporter: DataQualityReporter = DATA_QUALITY_REPORTER,
validator: ImmunizationValidator = IMMUNIZATION_VALIDATOR,
):
self.authoriser = authoriser
self.data_quality_reporter = data_quality_reporter
self.immunization_repo = imms_repo
self.validator = validator

Expand Down Expand Up @@ -116,6 +121,8 @@ def create_immunization(self, immunization: dict, supplier_system: str) -> Id:
if immunization.get("id") is not None:
raise CustomValidationError("id field must not be present for CREATE operation")

self.data_quality_reporter.generate_and_send_report(immunization)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be at the top of the function in case we hit the above validation error?


try:
self.validator.validate(immunization)
except (ValueError, MandatoryError) as error:
Expand All @@ -138,6 +145,8 @@ def create_immunization(self, immunization: dict, supplier_system: str) -> Id:
return self.immunization_repo.create_immunization(immunization_fhir_entity, supplier_system)

def update_immunization(self, imms_id: str, immunization: dict, supplier_system: str, resource_version: int) -> int:
self.data_quality_reporter.generate_and_send_report(immunization)

try:
self.validator.validate(immunization)
except (ValueError, MandatoryError) as error:
Expand Down
130 changes: 120 additions & 10 deletions lambdas/backend/tests/service/test_fhir_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
import unittest
from copy import deepcopy
from unittest.mock import Mock, create_autospec, patch
from unittest.mock import ANY, Mock, create_autospec, patch

from fhir.resources.R4B.bundle import BundleLink
from fhir.resources.R4B.identifier import Identifier
from fhir.resources.R4B.immunization import Immunization

from authorisation.api_operation_code import ApiOperationCode
from authorisation.authoriser import Authoriser
from common.data_quality.reporter import DataQualityReporter
from common.models.errors import (
CustomValidationError,
IdentifierDuplicationError,
Expand Down Expand Up @@ -46,8 +47,8 @@ def setUp(self):
self.mock_redis_getter = self.redis_getter_patcher.start()
self.logger_info_patcher = patch("logging.Logger.info")
self.mock_logger_info = self.logger_info_patcher.start()
self.imms_env_patcher = patch("service.fhir_service.IMMUNIZATION_ENV", "internal-dev")
self.imms_env_patcher.start()
imms_env_patcher = patch("service.fhir_service.IMMUNIZATION_ENV", "internal-dev")
imms_env_patcher.start()

def tearDown(self):
super().tearDown()
Expand All @@ -60,9 +61,10 @@ class TestGetImmunization(TestFhirServiceBase):
def setUp(self):
super().setUp()
self.authoriser = create_autospec(Authoriser)
self.data_quality_reporter = create_autospec(DataQualityReporter)
self.imms_repo = create_autospec(ImmunizationRepository)
self.validator = create_autospec(ImmunizationValidator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator)
self.logger_info_patcher = patch("logging.Logger.info")
self.mock_logger_info = self.logger_info_patcher.start()

Expand Down Expand Up @@ -181,8 +183,9 @@ def setUp(self):
super().setUp()
self.authoriser = create_autospec(Authoriser)
self.imms_repo = create_autospec(ImmunizationRepository)
self.data_quality_reporter = create_autospec(DataQualityReporter)
self.validator = create_autospec(ImmunizationValidator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator)
self.logger_info_patcher = patch("logging.Logger.info")
self.mock_logger_info = self.logger_info_patcher.start()

Expand Down Expand Up @@ -297,12 +300,14 @@ class TestCreateImmunization(TestFhirServiceBase):
def setUp(self):
super().setUp()
self.authoriser = create_autospec(Authoriser)
self.data_quality_reporter = create_autospec(DataQualityReporter)
self.imms_repo = create_autospec(ImmunizationRepository)
self.validator = create_autospec(ImmunizationValidator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator)
self.pre_validate_fhir_service = FhirService(
self.imms_repo,
self.authoriser,
self.data_quality_reporter,
ImmunizationValidator(add_post_validators=False),
)

Expand All @@ -322,6 +327,7 @@ def test_create_immunization(self):

# Then
self.authoriser.authorise.assert_called_once_with("Test", ApiOperationCode.CREATE, {"COVID"})
self.data_quality_reporter.generate_and_send_report.assert_called_once_with(req_imms)
self.imms_repo.check_immunization_identifier_exists.assert_called_once_with(
"https://supplierABC/identifiers/vacc", "ACME-vacc123456"
)
Expand All @@ -330,6 +336,48 @@ def test_create_immunization(self):
self.validator.validate.assert_called_once_with(req_imms)
self.assertEqual(self._MOCK_NEW_UUID, created_id)

@patch("common.data_quality.reporter.get_s3_client")
@patch("common.data_quality.reporter.uuid.uuid4", return_value="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8")
def test_create_immunization_submits_data_quality_report(self, _, mock_s3_client):
"""it should create a data quality report when a new immunisation is payload is submitted - case with two
DQ warnings for DOSE_UNIT_CODE"""
self.mock_redis.hget.return_value = "COVID"
self.mock_redis_getter.return_value = self.mock_redis
self.authoriser.authorise.return_value = True
self.imms_repo.check_immunization_identifier_exists.return_value = False
self.imms_repo.create_immunization.return_value = self._MOCK_NEW_UUID

dq_reporter = DataQualityReporter(is_batch_csv=False, bucket="test-dq-bucket-name")
fhir_service_with_dq = FhirService(self.imms_repo, self.authoriser, dq_reporter, self.validator)

nhs_number = VALID_NHS_NUMBER
req_imms = create_covid_immunization_dict_no_id(nhs_number)

# When
created_id = fhir_service_with_dq.create_immunization(req_imms, "Test")

# Then
mock_s3_client.assert_called_once()
mock_s3_client().put_object.assert_called_once_with(
Bucket="test-dq-bucket-name",
Key="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8.json",
Body=ANY,
ContentType="application/json",
)
_, kwargs = mock_s3_client().put_object.call_args_list[0]
self.assertEqual(
json.loads(kwargs["Body"]),
{
"data_quality_report_id": "2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8",
"validation_date": ANY,
"completeness": {"mandatory_fields": [], "optional_fields": [], "required_fields": ["DOSE_UNIT_CODE"]},
"validity": ["DOSE_UNIT_CODE"],
"timeliness_ingested_seconds": ANY,
"timeliness_recorded_days": 0,
},
)
self.assertEqual(self._MOCK_NEW_UUID, created_id)

def test_create_immunization_with_id_throws_error(self):
"""it should throw exception if id present in create Immunization"""
imms = create_covid_immunization_dict("an-id", "9990548609")
Expand Down Expand Up @@ -370,7 +418,7 @@ def test_post_validation_failed_create_invalid_target_disease(self):
+ ".code - ['bad-code'] is not a valid combination of disease codes for this service"
)

fhir_service = FhirService(self.imms_repo)
fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter)

with self.assertRaises(CustomValidationError) as error:
fhir_service.create_immunization(bad_target_disease_imms, "Test")
Expand All @@ -388,7 +436,7 @@ def test_post_validation_failed_create_missing_patient_name(self):
del bad_patient_name_imms["contained"][1]["name"][0]["given"]
bad_patient_name_msg = "contained[?(@.resourceType=='Patient')].name[0].given is a mandatory field"

fhir_service = FhirService(self.imms_repo)
fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter)

with self.assertRaises(CustomValidationError) as error:
fhir_service.create_immunization(bad_patient_name_imms, "Test")
Expand Down Expand Up @@ -464,8 +512,9 @@ class TestUpdateImmunization(TestFhirServiceBase):
def setUp(self):
super().setUp()
self.authoriser = create_autospec(Authoriser)
self.data_quality_reporter = create_autospec(DataQualityReporter)
self.imms_repo = create_autospec(ImmunizationRepository)
self.fhir_service = FhirService(self.imms_repo, self.authoriser)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter)
self.mock_redis.hget.return_value = "COVID"
self.mock_redis_getter.return_value = self.mock_redis

Expand Down Expand Up @@ -494,6 +543,7 @@ def test_update_immunization(self):

# Then
self.assertEqual(updated_version, 2)
self.data_quality_reporter.generate_and_send_report.assert_called_once_with(updated_immunisation)
self.imms_repo.get_immunization_resource_and_metadata_by_id.assert_called_once_with(
imms_id, include_deleted=True
)
Expand All @@ -502,6 +552,65 @@ def test_update_immunization(self):
)
self.authoriser.authorise.assert_called_once_with("Test", ApiOperationCode.UPDATE, {"COVID"})

@patch("common.data_quality.reporter.get_s3_client")
@patch("common.data_quality.reporter.uuid.uuid4", return_value="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8")
def test_update_immunization_submits_data_quality_report(self, _, mock_s3_client):
"""it should create a data quality report for every submitted immunization update - case where there are no
warnings in the report"""
imms_id = "an-id"
millilitres_snomed_code = "258773002"
snomed_coding_system = "http://snomed.info/sct"
original_immunisation = create_covid_immunization_dict(imms_id, VALID_NHS_NUMBER)

# Could fix in future. Example payload passes validation but coding is incorrect
original_immunisation["doseQuantity"]["code"] = millilitres_snomed_code
original_immunisation["doseQuantity"]["system"] = snomed_coding_system
identifier = Identifier(
system=original_immunisation["identifier"][0]["system"],
value=original_immunisation["identifier"][0]["value"],
)
updated_immunisation = create_covid_immunization_dict(imms_id, VALID_NHS_NUMBER, "2021-02-07T13:28:00+00:00")
updated_immunisation["doseQuantity"]["code"] = millilitres_snomed_code
updated_immunisation["doseQuantity"]["system"] = snomed_coding_system
existing_resource_meta = ImmunizationRecordMetadata(
identifier=identifier, resource_version=1, is_deleted=False, is_reinstated=False
)

self.imms_repo.get_immunization_resource_and_metadata_by_id.return_value = (
original_immunisation,
existing_resource_meta,
)
self.imms_repo.update_immunization.return_value = 2
self.authoriser.authorise.return_value = True

dq_reporter = DataQualityReporter(is_batch_csv=False, bucket="test-dq-bucket-name")
fhir_service_with_dq = FhirService(self.imms_repo, self.authoriser, dq_reporter)

# When
updated_version = fhir_service_with_dq.update_immunization(imms_id, updated_immunisation, "Test", 1)

# Then
mock_s3_client.assert_called_once()
mock_s3_client().put_object.assert_called_once_with(
Bucket="test-dq-bucket-name",
Key="2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8.json",
Body=ANY,
ContentType="application/json",
)
_, kwargs = mock_s3_client().put_object.call_args_list[0]
self.assertEqual(
json.loads(kwargs["Body"]),
{
"data_quality_report_id": "2a99d9fd-50b4-44f6-9c17-3f0c81cf9ce8",
"validation_date": ANY,
"completeness": {"mandatory_fields": [], "optional_fields": [], "required_fields": []},
"validity": [],
"timeliness_ingested_seconds": ANY,
"timeliness_recorded_days": 0,
},
)
self.assertEqual(updated_version, 2)

def test_update_immunization_raises_validation_exception_when_nhs_number_invalid(self):
"""it should raise a CustomValidationError when the patient's NHS number in the payload is invalid"""
imms_id = "an-id"
Expand Down Expand Up @@ -633,9 +742,10 @@ class TestDeleteImmunization(TestFhirServiceBase):
def setUp(self):
super().setUp()
self.authoriser = create_autospec(Authoriser)
self.data_quality_reporter = create_autospec(DataQualityReporter)
self.imms_repo = create_autospec(ImmunizationRepository)
self.validator = create_autospec(ImmunizationValidator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.validator)
self.fhir_service = FhirService(self.imms_repo, self.authoriser, self.data_quality_reporter, self.validator)

def test_delete_immunization(self):
"""it should delete Immunization record"""
Expand Down
8 changes: 8 additions & 0 deletions lambdas/recordprocessor/src/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from common.aws_s3_utils import move_file
from common.batch.eof_utils import make_batch_eof_message
from common.clients import logger
from common.data_quality.reporter import DataQualityReporter
from constants import (
ARCHIVE_DIR_NAME,
DATA_QUALITY_BUCKET_NAME,
PROCESSING_DIR_NAME,
SOURCE_BUCKET_NAME,
FileNotProcessedReason,
Expand Down Expand Up @@ -122,8 +124,10 @@ def process_rows(
"""
Processes each row in the csv_reader starting from start_row.
"""
data_quality_reporter = DataQualityReporter(is_batch_csv=True, bucket=DATA_QUALITY_BUCKET_NAME)
row_count = 0
start_row = total_rows_processed_count

try:
for row in csv_reader:
row_count += 1
Expand All @@ -135,6 +139,10 @@ def process_rows(
logger.info(f"Process: {total_rows_processed_count + 1}")
if start_row > 0 and row_count <= start_row + 10:
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count + 1}")

# Submit data quality report
data_quality_reporter.generate_and_send_report(row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hopefully won't be an issue, but we will send a duplicate report if we retry processing with a different file encoding


# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(target_disease, allowed_operations, row)
# Create the message body for sending
Expand Down
1 change: 1 addition & 0 deletions lambdas/recordprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
DATA_QUALITY_BUCKET_NAME = os.getenv("DATA_QUALITY_BUCKET_NAME")

ARCHIVE_DIR_NAME = "archive"
PROCESSING_DIR_NAME = "processing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def setUp(self):
self.mock_set_audit_table_ingestion_start_time = create_patch(
"file_level_validation.set_audit_table_ingestion_start_time"
)
create_patch("batch_processor.DataQualityReporter")

def tearDown(self):
patch.stopall()
Expand Down
Loading