Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
</None>
<None Update="meshServerSideCerts.crt">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="int.pem">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public CallDurableDemographicFunc(IHttpClientFunction httpClientFunction, ILogge
/// This method handles posting data, logging, and checking the status of the durable function.
/// Implements retry logic for status checking.
/// </remarks>
public async Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> participants, string DemographicFunctionURI, string fileName)
public async Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> participants, string DemographicFunctionURI, string fileName, List<ParticipantsParquetMap> parquetValuesForRetry)
{
var batchSize = participants.Count;
var responseContent = "";
Expand Down Expand Up @@ -92,7 +92,9 @@ public async Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> pa
_logger.LogError("Check limit reached or demographic function failed for a batch of size: {BatchSize} {FinalStatus}", batchSize, finalStatus);
await _copyFailedBatchToBlob.writeBatchToBlob(
JsonSerializer.Serialize(participants),
new InvalidOperationException("there was an error while adding batch of participants to the demographic table")
new InvalidOperationException("there was an error while adding batch of participants to the demographic table"),
parquetValuesForRetry,
fileName
);

return false;
Expand All @@ -106,7 +108,9 @@ await _copyFailedBatchToBlob.writeBatchToBlob(

await _copyFailedBatchToBlob.writeBatchToBlob(
JsonSerializer.Serialize(participants),
new InvalidOperationException("there was an error while adding batch of participants to the demographic table")
new InvalidOperationException("there was an error while adding batch of participants to the demographic table"),
parquetValuesForRetry,
fileName
);

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
namespace NHS.Screening.ReceiveCaasFile;

using System.Text;
using System;
using System.Collections.Generic;
using System.Text.Json;
using Azure.Storage.Blobs;
using Common;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Model;
using Parquet.Serialization;

public class CopyFailedBatchToBlob : ICopyFailedBatchToBlob
{
Expand All @@ -15,32 +18,131 @@ public class CopyFailedBatchToBlob : ICopyFailedBatchToBlob

private readonly ReceiveCaasFileConfig _config;

public CopyFailedBatchToBlob(ILogger<CopyFailedBatchToBlob> logger, IBlobStorageHelper blobStorageHelper, IExceptionHandler handleException, IOptions<ReceiveCaasFileConfig> config)
private readonly IFailedBatchDict _failedBatchDict;

public CopyFailedBatchToBlob(ILogger<CopyFailedBatchToBlob> logger, IBlobStorageHelper blobStorageHelper, IExceptionHandler handleException, IOptions<ReceiveCaasFileConfig> config, IFailedBatchDict failedBatchDict)
{
_config = config.Value;
_logger = logger;
_blobStorageHelper = blobStorageHelper;
_handleException = handleException;
_failedBatchDict = failedBatchDict;
}

public async Task<bool> writeBatchToBlob(string jsonFromBatch, InvalidOperationException invalidOperationException)
public async Task<bool> writeBatchToBlob(string jsonFromBatch, InvalidOperationException invalidOperationException, List<ParticipantsParquetMap> parquetValuesForRetry, string fileName = "")
{
using (var stream = GenerateStreamFromString(jsonFromBatch))
{
var blobFile = new BlobFile(stream, $"failedBatch-{Guid.NewGuid()}.json");
var copied = await _blobStorageHelper.UploadFileToBlobStorage(_config.caasfolder_STORAGE, "failed-batch", blobFile);

if (copied)
if (!string.IsNullOrEmpty(fileName))
{
_logger.LogInformation("adding failed batch to blob was successful");
await _handleException.CreateSystemExceptionLog(invalidOperationException, new Participant(), "file name unknown but batch was copied to FailedBatch blob store");
return true;
if (_failedBatchDict.ShouldRetryFile(fileName))
{
var fileRetryCount = _failedBatchDict.GetRetryCount(fileName);
upsertRetryValue(fileName, fileRetryCount);
var pathOfFileToRetry = await convertBatchToParquet(parquetValuesForRetry, fileName);
if (!string.IsNullOrEmpty(pathOfFileToRetry))
{
await RetryFailedBatch(pathOfFileToRetry, fileName);
}
}
else
{
var filePath = FileDirectoryPath(fileName);
if (File.Exists(fileName))
{
File.Delete(fileName);
}
}
}

fileName = $"failedBatch-{Guid.NewGuid()}.json";
await AddItemToBlob(stream, fileName);

await _handleException.CreateSystemExceptionLog(invalidOperationException, new Participant(), "file name unknown but batch was copied to FailedBatch blob store");
_logger.LogInformation("adding failed batch to blob was unsuccessful");
return false;
return true;
}
}

private void upsertRetryValue(string filename, int fileRetryCount)
{
fileRetryCount = fileRetryCount + 1;
if (!_failedBatchDict.HasFileFailedBefore(filename))
{

_failedBatchDict.AddFailedBatchDataToDict(filename, fileRetryCount);
}
else
{
_failedBatchDict.UpdateFileFailureCount(filename, fileRetryCount);
}
}

private async Task<string> convertBatchToParquet(List<ParticipantsParquetMap> parquetValuesForRetry, string fileName)
{
try
{
var parquetData = parquetValuesForRetry
.Select(ParticipantsParquetMap.ToParticipantParquet)
.ToList();

var filePath = FileDirectoryPath(fileName);
await ParquetSerializer.SerializeAsync(parquetData, filePath);

return filePath;
}
catch (Exception ex)
{
_logger.LogError(ex, "There was a problem when converting a failed batch to parquet for retry {error}", ex.Message);
return "";
}

}

private async Task<bool> RetryFailedBatch(string localFilePath, string fileName)
{

var copied = false;
using (FileStream fileStream = File.OpenRead(localFilePath))
{
var blobFile = new BlobFile(fileStream, fileName);
copied = await _blobStorageHelper.UploadFileToBlobStorage(_config.caasfolder_STORAGE, "inbound", blobFile, true);
}

if (copied)
{
_logger.LogInformation("Adding failed batch to blob was successful");
return true;
}
_logger.LogError("Adding failed batch to blob was unsuccessful");
return false;
}


private async Task<bool> AddItemToBlob(Stream stream, string fileName)
{
var blobFile = new BlobFile(stream, fileName);
var copied = await _blobStorageHelper.UploadFileToBlobStorage(_config.caasfolder_STORAGE, "failed-batch", blobFile);

if (copied)
{
_logger.LogInformation("Adding failed batch to blob was successful");
return true;
}
return false;
}

private string FileDirectoryPath(string fileName)
{
var currentDirectory = Directory.GetCurrentDirectory();
var filePath = Path.Combine(currentDirectory, fileName);

filePath = filePath.Replace("bin/output/", "");

return filePath;
}


private static Stream GenerateStreamFromString(string s)
{
var stream = new MemoryStream();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using Common;
using Microsoft.Azure.Functions.Worker;

public class FailedBatchDict : IFailedBatchDict
{
private Dictionary<string, int> RetryDictionary;

private static readonly int RetryCount = 3;
public FailedBatchDict()
{
RetryDictionary = new Dictionary<string, int>();
}

public void AddFailedBatchDataToDict(string FileName, int retryCount)
{
RetryDictionary.Add(FileName, retryCount);
}

public int GetRetryCount(string fileName)
{
if (RetryDictionary.TryGetValue(fileName, out int fileRetryCount))
{
return fileRetryCount;
}
return 0;
}

public bool HasFileFailedBefore(string fileName)
{
return RetryDictionary.ContainsKey(fileName);
}

public void UpdateFileFailureCount(string fileName, int failureCount)
{
RetryDictionary[fileName] = failureCount;
}


public bool ShouldRetryFile(string filename)
{
if (RetryDictionary.Count < 1)
{
return true;
}

if (RetryDictionary.TryGetValue(filename, out int fileRetryCount))
{
if (fileRetryCount < RetryCount)
{
return true;
}
RetryDictionary.Remove(filename);
return false;
}

return true;
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace NHS.Screening.ReceiveCaasFile;

public interface ICallDurableDemographicFunc
{
Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> participants, string DemographicFunctionURI, string fileName);
Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> participants, string DemographicFunctionURI, string fileName, List<ParticipantsParquetMap> values);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace NHS.Screening.ReceiveCaasFile;

using Model;

public interface ICopyFailedBatchToBlob
{
Task<bool> writeBatchToBlob(string jsonFromBatch, InvalidOperationException invalidOperationException);
Task<bool> writeBatchToBlob(string jsonFromBatch, InvalidOperationException invalidOperationException, List<ParticipantsParquetMap> parquetValuesForRetry, string fileName = "");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
public interface IFailedBatchDict
{
void AddFailedBatchDataToDict(string FileName, int retryCount);
bool ShouldRetryFile(string filename);
int GetRetryCount(string fileName);
bool HasFileFailedBefore(string fileName);
void UpdateFileFailureCount(string fileName, int failureCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,43 @@ IOptions<ReceiveCaasFileConfig> receiveCaasFileConfig
public async Task ProcessRecords(List<ParticipantsParquetMap> values, ParallelOptions options, ScreeningLkp screeningService, string name)
{
var currentBatch = new Batch();

await Parallel.ForEachAsync(values, options, async (rec, cancellationToken) =>
{
var participant = _receiveCaasFileHelper.MapParticipant(rec, screeningService.ScreeningId.ToString(), screeningService.ScreeningName, name);

if (participant == null)
{
values.Remove(rec);
await _exceptionHandler.CreateSystemExceptionLogFromNhsNumber(new Exception($"Could not map participant in file {name}"), rec.NhsNumber.ToString(), name, screeningService.ScreeningName, "");
return;
}

if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber))
{
values.Remove(rec);
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Invalid NHS Number was passed in for participant {participant} and file {name}"), participant, name, nameof(ExceptionCategory.CaaS));
return; // skip current participant
}

if (!_validateDates.ValidateAllDates(participant))
{
values.Remove(rec);
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Invalid effective date found in participant data {participant} and file name {name}"), participant, name);
return; // Skip current participant
}

if (!_recordsProcessTracker.RecordAlreadyProcessed(participant.RecordType, participant.NhsNumber))
{
values.Remove(rec);
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Duplicate Participant was in the file"), participant, name);
return; // Skip current participant
}

await AddRecordToBatch(participant, currentBatch, name);
});

if (await _callDurableDemographicFunc.PostDemographicDataAsync(currentBatch.DemographicData.ToList(), DemographicURI, name))
if (await _callDurableDemographicFunc.PostDemographicDataAsync(currentBatch.DemographicData.ToList(), DemographicURI, name, values))
{
await AddBatchToQueue(currentBatch, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
services.AddScoped<ICreateBasicParticipantData, CreateBasicParticipantData>();
services.AddScoped<IAddBatchToQueue, AddBatchToQueue>();
services.AddScoped<IRecordsProcessedTracker, RecordsProcessedTracker>(); //Do not change the lifetime of this.
services.AddTransient<IBlobStorageHelper, BlobStorageHelper>();
services.AddTransient<ICopyFailedBatchToBlob, CopyFailedBatchToBlob>();
services.AddSingleton<IFailedBatchDict, FailedBatchDict>();
services.AddSingleton<IBlobStorageHelper, BlobStorageHelper>();
services.AddSingleton<ICopyFailedBatchToBlob, CopyFailedBatchToBlob>();
services.AddScoped<IValidateDates, ValidateDates>();
// Register health checks
services.AddBlobStorageHealthCheck("receiveCaasFile");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.4" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="9.0.2" />
<PackageReference Include="Parquet.Net" Version="5.2.0" />
<PackageReference Include="Polly" Version="8.5.0" />
</ItemGroup>
<ItemGroup>
Expand Down
Loading
Loading