Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lambdas/ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Add the filename to the audit table and check for duplicates."""

from common.clients import dynamodb_client, logger
from common.models.batch_constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
from common.models.errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus

CONDITION_EXPRESSION = "attribute_exists(message_id)"

Expand Down
37 changes: 0 additions & 37 deletions lambdas/ack_backend/src/constants.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,11 @@
"""Constants for ack lambda"""

import os

AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")

COMPLETED_ACK_DIR = "forwardedFile"
TEMP_ACK_DIR = "TempAck"
BATCH_FILE_PROCESSING_DIR = "processing"
BATCH_FILE_ARCHIVE_DIR = "archive"


def get_source_bucket_name() -> str:
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""
return os.getenv("SOURCE_BUCKET_NAME")


def get_ack_bucket_name() -> str:
"""Get the ACK_BUCKET_NAME environment from environment variables."""
return os.getenv("ACK_BUCKET_NAME")


class FileStatus:
"""File status constants"""

QUEUED = "Queued"
PROCESSING = "Processing"
PROCESSED = "Processed"
DUPLICATE = "Duplicate"


class AuditTableKeys:
"""Audit table keys"""

FILENAME = "filename"
MESSAGE_ID = "message_id"
QUEUE_NAME = "queue_name"
RECORD_COUNT = "record_count"
STATUS = "status"
TIMESTAMP = "timestamp"
INGESTION_END_TIME = "ingestion_end_time"
RECORDS_SUCCEEDED = "records_succeeded"
RECORDS_FAILED = "records_failed"


ACK_HEADERS = [
"MESSAGE_HEADER_ID",
"HEADER_RESPONSE_CODE",
Expand Down
16 changes: 6 additions & 10 deletions lambdas/ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
)
from common.aws_s3_utils import move_file
from common.clients import get_s3_client, logger
from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME
from constants import (
ACK_HEADERS,
BATCH_FILE_ARCHIVE_DIR,
BATCH_FILE_PROCESSING_DIR,
COMPLETED_ACK_DIR,
TEMP_ACK_DIR,
get_ack_bucket_name,
get_source_bucket_name,
)
from logging_decorators import complete_batch_file_process_logging_decorator

Expand Down Expand Up @@ -71,10 +70,8 @@ def complete_batch_file_process(
the audit table status"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"

move_file(get_ack_bucket_name(), f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
move_file(
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
)
move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}")

total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id)
change_audit_table_status_to_processed(file_key, message_id)
Expand All @@ -99,7 +96,7 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = get_s3_client().get_object(Bucket=get_ack_bucket_name(), Key=temp_ack_file_key)
existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
existing_content = existing_ack_file["Body"].read().decode("utf-8")
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
Expand Down Expand Up @@ -132,7 +129,6 @@ def update_ack_file(
accumulated_csv_content.write(cleaned_row + "\n")

csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
ack_bucket_name = get_ack_bucket_name()

get_s3_client().upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)
logger.info("Ack file updated to %s: %s", ack_bucket_name, completed_ack_file_key)
get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, completed_ack_file_key)
6 changes: 6 additions & 0 deletions lambdas/ack_backend/tests/test_ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def setUp(self) -> None:
self.logger_info_patcher = patch("common.log_decorator.logger.info")
self.mock_logger_info = self.logger_info_patcher.start()

self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION)
self.ack_bucket_patcher.start()

self.source_bucket_patcher = patch("update_ack_file.SOURCE_BUCKET_NAME", BucketNames.SOURCE)
self.source_bucket_patcher.start()

def tearDown(self) -> None:
GenericTearDown(self.s3_client, self.firehose_client, self.dynamodb_client)
self.mock_logger_info.stop()
Expand Down
2 changes: 1 addition & 1 deletion lambdas/ack_backend/tests/test_audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from unittest.mock import call, patch

import audit_table
from common.models.batch_constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
from common.models.errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus


class TestAuditTable(unittest.TestCase):
Expand Down
4 changes: 4 additions & 0 deletions lambdas/ack_backend/tests/test_logging_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import patch

import logging_decorators
from utils.mock_environment_variables import BucketNames


class TestLoggingDecorators(unittest.TestCase):
Expand All @@ -12,6 +13,9 @@ def setUp(self):
self.firehose_patcher = patch("common.log_firehose.firehose_client")
self.mock_firehose = self.firehose_patcher.start()

self.source_bucket_patcher = patch("update_ack_file.SOURCE_BUCKET_NAME", BucketNames.SOURCE)
self.source_bucket_patcher.start()

def tearDown(self):
self.logger_patcher.stop()
self.firehose_patcher.stop()
Expand Down
3 changes: 3 additions & 0 deletions lambdas/ack_backend/tests/test_splunk_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def setUp(self):
Body=mock_source_file_with_100_rows.getvalue(),
)

self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION)
self.ack_bucket_patcher.start()

def tearDown(self):
GenericTearDown(self.s3_client)

Expand Down
3 changes: 3 additions & 0 deletions lambdas/ack_backend/tests/test_update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def setUp(self) -> None:
self.logger_patcher = patch("update_ack_file.logger")
self.mock_logger = self.logger_patcher.start()

self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION)
self.ack_bucket_patcher.start()

def tearDown(self) -> None:
GenericTearDown(self.s3_client)

Expand Down
16 changes: 7 additions & 9 deletions lambdas/ack_backend/tests/test_update_ack_file_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@
from moto import mock_aws

import update_ack_file
from utils.mock_environment_variables import BucketNames


@mock_aws
class TestUpdateAckFileFlow(unittest.TestCase):
def setUp(self):
self.s3_client = boto3.client("s3", region_name="eu-west-2")

self.ack_bucket_name = "my-ack-bucket"
self.source_bucket_name = "my-source-bucket"
self.ack_bucket_patcher = patch("update_ack_file.get_ack_bucket_name", return_value=self.ack_bucket_name)
self.mock_get_ack_bucket_name = self.ack_bucket_patcher.start()
self.ack_bucket_name = BucketNames.DESTINATION
self.source_bucket_name = BucketNames.SOURCE
self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", self.ack_bucket_name)
self.ack_bucket_patcher.start()

self.source_bucket_patcher = patch(
"update_ack_file.get_source_bucket_name",
return_value=self.source_bucket_name,
)
self.mock_get_source_bucket_name = self.source_bucket_patcher.start()
self.source_bucket_patcher = patch("update_ack_file.SOURCE_BUCKET_NAME", self.source_bucket_name)
self.source_bucket_patcher.start()

self.s3_client.create_bucket(
Bucket=self.ack_bucket_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Generic setup and teardown for ACK backend tests"""

from constants import AuditTableKeys
from common.models.batch_constants import AuditTableKeys
from tests.utils.mock_environment_variables import AUDIT_TABLE_NAME, REGION_NAME, BucketNames, Firehose


Expand Down
4 changes: 2 additions & 2 deletions lambdas/backend/src/authorisation/authoriser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from authorisation.api_operation_code import ApiOperationCode
from common.clients import logger
from common.models.constants import SUPPLIER_PERMISSIONS_HASH_KEY
from common.models.constants import RedisHashKeys
from common.redis_client import get_redis_client


Expand Down Expand Up @@ -33,7 +33,7 @@ def _expand_permissions(
return expanded_permissions

def _get_supplier_permissions(self, supplier_system: str) -> dict[str, list[ApiOperationCode]]:
raw_permissions_data = get_redis_client().hget(SUPPLIER_PERMISSIONS_HASH_KEY, supplier_system)
raw_permissions_data = get_redis_client().hget(RedisHashKeys.SUPPLIER_PERMISSIONS_HASH_KEY, supplier_system)
permissions_data = json.loads(raw_permissions_data) if raw_permissions_data else []

return self._expand_permissions(permissions_data)
Expand Down
4 changes: 2 additions & 2 deletions lambdas/backend/src/controller/parameter_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from typing import Optional

from common.models.constants import Constants
from common.models.constants import RedisHashKeys
from common.models.utils.generic_utils import nhs_number_mod11_check
from common.redis_client import get_redis_client
from controller.constants import IdentifierSearchElement, IdentifierSearchParameterName, ImmunizationSearchParameterName
Expand Down Expand Up @@ -81,7 +81,7 @@ def process_immunization_target(imms_params: dict[str, list[str]]) -> set[str]:
f"Search parameter {ImmunizationSearchParameterName.IMMUNIZATION_TARGET} must have one or more values."
)

valid_vaccine_types = get_redis_client().hkeys(Constants.VACCINE_TYPE_TO_DISEASES_HASH_KEY)
valid_vaccine_types = get_redis_client().hkeys(RedisHashKeys.VACCINE_TYPE_TO_DISEASES_HASH_KEY)
if any(x not in valid_vaccine_types for x in vaccine_types):
raise ParameterExceptionError(
f"{ImmunizationSearchParameterName.IMMUNIZATION_TARGET} must be one or more of the following: "
Expand Down
6 changes: 3 additions & 3 deletions lambdas/backend/src/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def create_reference_to_patient_resource(patient_full_url: str, patient: dict) -
Returns a reference to the given patient which includes the patient nhs number identifier (system and value fields
only) and a reference to patient full url. "Type" field is set to "Patient".
"""
patient_nhs_number_identifier = [x for x in patient["identifier"] if x.get("system") == Urls.nhs_number][0]
patient_nhs_number_identifier = [x for x in patient["identifier"] if x.get("system") == Urls.NHS_NUMBER][0]

return {
"reference": patient_full_url,
Expand Down Expand Up @@ -67,9 +67,9 @@ def replace_organization_values(imms: dict) -> dict:
identifier = performer["actor"].get("identifier", {})
if identifier.get("value") is not None:
identifier["value"] = "N2N9I"
identifier["system"] = Urls.ods_organization_code
identifier["system"] = Urls.ODS_ORGANIZATION_CODE
if identifier.get("system") is not None:
identifier["system"] = Urls.ods_organization_code
identifier["system"] = Urls.ODS_ORGANIZATION_CODE

# Ensure only 'system' and 'value' remain in identifier
keys = {"system", "value"}
Expand Down
2 changes: 1 addition & 1 deletion lambdas/backend/tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_replace_organization_values(self):
# Prepare the input data
input_imms = load_json_data("completed_covid_immunization_event.json")
# Change the input data's organization_identifier_system to be something other than the ods url
input_imms["performer"][1]["actor"]["identifier"]["system"] = Urls.urn_school_number
input_imms["performer"][1]["actor"]["identifier"]["system"] = Urls.URN_SCHOOL_NUMBER
# Add organization_display to the input data (note that whilst this field is not one of the expected fields,
# the validator does not prevent it from being included on a create or update, so the possiblity of it
# existing must be handled)
Expand Down
4 changes: 1 addition & 3 deletions lambdas/batch_processor_filter/src/batch_audit_repository.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from boto3.dynamodb.conditions import Key

from common.aws_dynamodb import get_dynamodb_table
from common.models.batch_constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
from constants import (
AUDIT_TABLE_FILENAME_GSI,
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
AuditTableKeys,
FileStatus,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from batch_file_created_event import BatchFileCreatedEvent
from common.clients import get_s3_client
from constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME
from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME


class BatchFileRepository:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from batch_file_repository import BatchFileRepository
from common.clients import get_sqs_client, logger
from common.log_firehose import send_log_to_firehose
from constants import QUEUE_URL, SPLUNK_FIREHOSE_STREAM_NAME, FileNotProcessedReason, FileStatus
from common.models.batch_constants import FileNotProcessedReason, FileStatus
from constants import QUEUE_URL, SPLUNK_FIREHOSE_STREAM_NAME
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError

BATCH_AUDIT_REPOSITORY = BatchAuditRepository()
Expand Down
32 changes: 0 additions & 32 deletions lambdas/batch_processor_filter/src/constants.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,6 @@
import os
from enum import StrEnum

AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
AUDIT_TABLE_FILENAME_GSI = os.getenv("FILE_NAME_GSI")
AUDIT_TABLE_QUEUE_NAME_GSI = os.getenv("QUEUE_NAME_GSI")
QUEUE_URL = os.getenv("QUEUE_URL")
SPLUNK_FIREHOSE_STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME")
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")


class FileStatus(StrEnum):
"""File status constants"""

QUEUED = "Queued"
PROCESSING = "Processing"
PREPROCESSED = "Preprocessed"
PROCESSED = "Processed"
NOT_PROCESSED = "Not processed"
FAILED = "Failed"


class FileNotProcessedReason(StrEnum):
"""Reasons why a file was not processed"""

DUPLICATE = "Duplicate"


class AuditTableKeys(StrEnum):
"""Audit table keys"""

FILENAME = "filename"
MESSAGE_ID = "message_id"
QUEUE_NAME = "queue_name"
STATUS = "status"
TIMESTAMP = "timestamp"
ERROR_DETAILS = "error_details"
4 changes: 1 addition & 3 deletions lambdas/batch_processor_filter/tests/test_lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@

with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
from common.clients import REGION_NAME
from common.models.batch_constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
from constants import (
AUDIT_TABLE_FILENAME_GSI,
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
SPLUNK_FIREHOSE_STREAM_NAME,
AuditTableKeys,
FileStatus,
)
from lambda_handler import lambda_handler

Expand Down
2 changes: 1 addition & 1 deletion lambdas/filenameprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import Optional

from common.clients import dynamodb_client, logger
from common.models.batch_constants import AUDIT_TABLE_NAME, AuditTableKeys
from common.models.errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AuditTableKeys


def upsert_audit_table(
Expand Down
Loading