Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/topic_dlchange.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"type": "number",
"description": "Timestamp of the event in epoch milliseconds"
},
"country": {
"type": "string",
"description": "The country the data is related to."
},
"catalog_id": {
"type": "string",
"description": "Identifier for the data definition (Glue/Hive) database and table name for example "
Expand Down
4 changes: 4 additions & 0 deletions conf/topic_runs.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
"type": "string",
"description": "Identifier for the data definition (Glue/Hive) database and table name for example"
},
"country": {
"type": "string",
"description": "The country the data is related to."
},
"status": {
"type": "string",
"enum": ["succeeded", "failed", "killed", "skipped"],
Expand Down
6 changes: 6 additions & 0 deletions src/writers/writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
source_app_version,
environment,
timestamp_event,
country,
catalog_id,
operation,
"location",
Expand All @@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
%s,
%s,
%s,
%s,
%s
)""",
(
Expand All @@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
message["source_app_version"],
message["environment"],
message["timestamp_event"],
message.get("country", ""),
message["catalog_id"],
message["operation"],
message.get("location"),
Expand Down Expand Up @@ -188,6 +191,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
(
event_id,
catalog_id,
country,
status,
timestamp_start,
timestamp_end,
Expand All @@ -202,11 +206,13 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
job["catalog_id"],
job.get("country", ""),
job["status"],
job["timestamp_start"],
job["timestamp_end"],
Expand Down
25 changes: 14 additions & 11 deletions tests/writers/test_writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ def test_postgres_edla_write_with_optional_fields():
writer_postgres.postgres_edla_write(cur, "table_a", message)
assert len(cur.executions) == 1
_sql, params = cur.executions[0]
assert len(params) == 12
assert len(params) == 13
assert params[0] == "e1"
assert params[8] == "s3://bucket/path"
assert params[9] == "parquet"
assert json.loads(params[10]) == {"compression": "snappy"}
assert json.loads(params[11]) == {"foo": "bar"}
assert params[6] == "" # country (default empty string)
assert params[9] == "s3://bucket/path"
assert params[10] == "parquet"
assert json.loads(params[11]) == {"compression": "snappy"}
assert json.loads(params[12]) == {"foo": "bar"}


def test_postgres_edla_write_missing_optional():
Expand All @@ -80,10 +81,11 @@ def test_postgres_edla_write_missing_optional():
}
writer_postgres.postgres_edla_write(cur, "table_a", message)
_sql, params = cur.executions[0]
assert params[8] is None
assert params[9] == "delta"
assert params[10] is None
assert params[11] is None
assert params[6] == "" # country (default empty string)
assert params[9] is None # location
assert params[10] == "delta"
assert params[11] is None # format_options
assert params[12] is None # additional_info


def test_postgres_run_write():
Expand Down Expand Up @@ -115,8 +117,9 @@ def test_postgres_run_write():
assert "source_app_version" in run_sql
assert run_params[3] == "runapp"
_job2_sql, job2_params = cur.executions[2]
assert job2_params[5] == "err"
assert json.loads(job2_params[6]) == {"k": "v"}
assert job2_params[2] == "" # country (default empty string)
assert job2_params[6] == "err"
assert json.loads(job2_params[7]) == {"k": "v"}


def test_postgres_test_write():
Expand Down
Loading