diff --git a/conf/topic_dlchange.json b/conf/topic_dlchange.json index 7cd7c22..dbf75f7 100644 --- a/conf/topic_dlchange.json +++ b/conf/topic_dlchange.json @@ -25,6 +25,10 @@ "type": "number", "description": "Timestamp of the event in epoch milliseconds" }, + "country": { + "type": "string", + "description": "The country the data is related to." + }, "catalog_id": { "type": "string", "description": "Identifier for the data definition (Glue/Hive) database and table name for example " diff --git a/conf/topic_runs.json b/conf/topic_runs.json index 6239cd8..c84c436 100644 --- a/conf/topic_runs.json +++ b/conf/topic_runs.json @@ -43,6 +43,10 @@ "type": "string", "description": "Identifier for the data definition (Glue/Hive) database and table name for example" }, + "country": { + "type": "string", + "description": "The country the data is related to." + }, "status": { "type": "string", "enum": ["succeeded", "failed", "killed", "skipped"], diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index ecce377..9fa8674 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: source_app_version, environment, timestamp_event, + country, catalog_id, operation, "location", @@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: %s, %s, %s, + %s, %s )""", ( @@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: message["source_app_version"], message["environment"], message["timestamp_event"], + message.get("country", ""), message["catalog_id"], message["operation"], message.get("location"), @@ -188,6 +191,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s ( event_id, catalog_id, + country, status, timestamp_start, timestamp_end, @@ -202,11 +206,13 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s %s, %s, %s, + %s, %s )""", ( message["event_id"], job["catalog_id"], + job.get("country", ""), job["status"], job["timestamp_start"], job["timestamp_end"], diff --git a/tests/writers/test_writer_postgres.py b/tests/writers/test_writer_postgres.py index 678b664..69dafaa 100644 --- a/tests/writers/test_writer_postgres.py +++ b/tests/writers/test_writer_postgres.py @@ -57,12 +57,13 @@ def test_postgres_edla_write_with_optional_fields(): writer_postgres.postgres_edla_write(cur, "table_a", message) assert len(cur.executions) == 1 _sql, params = cur.executions[0] - assert len(params) == 12 + assert len(params) == 13 assert params[0] == "e1" - assert params[8] == "s3://bucket/path" - assert params[9] == "parquet" - assert json.loads(params[10]) == {"compression": "snappy"} - assert json.loads(params[11]) == {"foo": "bar"} + assert params[6] == "" # country (default empty string) + assert params[9] == "s3://bucket/path" + assert params[10] == "parquet" + assert json.loads(params[11]) == {"compression": "snappy"} + assert json.loads(params[12]) == {"foo": "bar"} def test_postgres_edla_write_missing_optional(): @@ -80,10 +81,11 @@ def test_postgres_edla_write_missing_optional(): } writer_postgres.postgres_edla_write(cur, "table_a", message) _sql, params = cur.executions[0] - assert params[8] is None - assert params[9] == "delta" - assert params[10] is None - assert params[11] is None + assert params[6] == "" # country (default empty string) + assert params[9] is None # location + assert params[10] == "delta" + assert params[11] is None # format_options + assert params[12] is None # additional_info def test_postgres_run_write(): @@ -115,8 +117,9 @@ def test_postgres_run_write(): assert "source_app_version" in run_sql assert run_params[3] == "runapp" _job2_sql, job2_params = cur.executions[2] - assert job2_params[5] == "err" - assert json.loads(job2_params[6]) == {"k": "v"} + assert job2_params[2] == "" # country (default empty string) + assert job2_params[6] == "err" + assert json.loads(job2_params[7]) == {"k": "v"} def test_postgres_test_write():