Skip to content

Commit 00e5f5f

Browse files
authored
Http-193 Pass format options to the http response (getindata#199)
Signed-off-by: davidradl <david_radley@uk.ibm.com>
1 parent 0337b74 commit 00e5f5f

File tree

5 files changed

+88
-27
lines changed

5 files changed

+88
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- allow format options to be applied to the http response decoding.
6+
57
## [0.24.0] - 2025-11-26
68

79
- Add UNABLE_TO_DESERIALIZE_RESPONSE http-completion-state. If you have used

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,12 @@ POST, PUT and GET operations. This query creator allows you to issue json reques
153153
your own custom http connector. The mappings from columns to the json request are supplied in the query creator configuration
154154
parameters `gid.connector.http.request.query-param-fields`, `gid.connector.http.request.body-fields` and `gid.connector.http.request.url-map`.
155155

156+
### Format considerations
157+
158+
#### For http requests
156159
In order to use custom format, user has to specify option `'lookup-request.format' = 'customFormatName'`, where `customFormatName` is the identifier of custom format factory.
157160

158-
Additionally, it is possible to pass query format options from table's DDL.
161+
Additionally, it is possible to pass custom query format options from table's DDL.
159162
This can be done by using option like so: `'lookup-request.format.customFormatName.customFormatProperty' = 'propertyValue'`, for example
160163
`'lookup-request.format.customFormatName.fail-on-missing-field' = 'true'`.
161164

@@ -166,6 +169,14 @@ DynamicTableFactory.Context context, ReadableConfig formatOptions)` method in `R
166169
With default configuration, Flink-Json format is used for `GenericGetQueryCreator`, all options defined in [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
167170
can be passed through table DDL. For example `'lookup-request.format.json.fail-on-missing-field' = 'true'`. In this case, format identifier is `json`.
168171

172+
#### For http responses
173+
Specify your format options at the top level. For example:
174+
```roomsql
175+
'format' = 'json',
176+
'json.ignore-parse-errors' = 'true',
177+
```
178+
179+
169180
#### Timeouts
170181
Lookup Source is guarded by two timeout timers. First one is specified by Flink's AsyncIO operator that executes `AsyncTableFunction`.
171182
The default value of this timer is set to 3 minutes and can be changed via `table.exec.async-lookup.timeout` [option](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-async-lookup-timeout).

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,26 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
5353
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);
5454

5555
ReadableConfig readable = helper.getOptions();
56-
helper.validateExcept(
57-
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
58-
"table.",
59-
HttpConnectorConfigConstants.GID_CONNECTOR_HTTP,
60-
LOOKUP_REQUEST_FORMAT.key()
61-
);
62-
validateHttpLookupSourceOptions(readable);
6356

57+
// Discover and validate the decoding format first - this validates format-specific options
6458
DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
6559
helper.discoverDecodingFormat(
6660
DeserializationFormatFactory.class,
6761
FactoryUtil.FORMAT
6862
);
6963

64+
// Validate connector options, excluding:
65+
// - "table.*" (Flink execution config options)
66+
// - "gid.connector.http.*" (dynamic connector-specific properties)
67+
// - LOOKUP_REQUEST_FORMAT (custom lookup format option)
68+
// Format options are already validated by discoverDecodingFormat() above
69+
helper.validateExcept(
70+
"table.",
71+
HttpConnectorConfigConstants.GID_CONNECTOR_HTTP,
72+
LOOKUP_REQUEST_FORMAT.key()
73+
);
74+
validateHttpLookupSourceOptions(readable);
75+
7076
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);
7177

7278
ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public Collection<RowData> lookup(RowData keyRow) {
8181
HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow);
8282
Collection<RowData> httpCollector = httpRowDataWrapper.getData();
8383

84-
int physicalArity=-1;
84+
int physicalArity = -1;
8585

8686
GenericRowData producedRow = null;
8787
if (httpRowDataWrapper.shouldIgnore()) {
@@ -103,10 +103,15 @@ public Collection<RowData> lookup(RowData keyRow) {
103103
}
104104
}
105105
// if we did not get the physical arity from the http response physical row then get it from the
106-
// producedDataType. which is set when we have metadata
107-
if (physicalArity == -1 && producedDataType != null ) {
108-
List<LogicalType> childrenLogicalTypes=producedDataType.getLogicalType().getChildren();
109-
physicalArity=childrenLogicalTypes.size()-metadataArity;
106+
// producedDataType. which is set when we have metadata or when there's no data
107+
if (physicalArity == -1) {
108+
if (producedDataType != null) {
109+
List<LogicalType> childrenLogicalTypes = producedDataType.getLogicalType().getChildren();
110+
physicalArity = childrenLogicalTypes.size() - metadataArity;
111+
} else {
112+
// If producedDataType is null and we have no data, return the same way as ignore.
113+
return Collections.emptyList();
114+
}
110115
}
111116
// if there was no data, create an empty producedRow
112117
if (producedRow == null) {

src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,11 @@ private void assertResultsForSpec(TestSpec spec, Collection<Row> rows) {
993993
if (spec.badStatus) {
994994
assertEnrichedRowsNoDataBadStatus(rows);
995995
} else if (spec.deserError) {
996-
assertEnrichedRowsDeserException(rows);
996+
if (spec.ignoreParseErrors) {
997+
assertEnrichedRowsNoDataGoodStatus(rows);
998+
} else {
999+
assertEnrichedRowsDeserException(rows);
1000+
}
9971001
} else if (spec.connectionError) {
9981002
assertEnrichedRowsException(rows);
9991003
} else if (spec.useMetadata) {
@@ -1063,6 +1067,33 @@ private void assertEnrichedRowsNoDataBadStatus(Collection<Row> collectedRows ) {
10631067
);
10641068
}
10651069

1070+
private void assertEnrichedRowsNoDataGoodStatus(Collection<Row> collectedRows ) {
1071+
1072+
final int rowArity = 10;
1073+
// validate every row and its column.
1074+
1075+
assertAll(() -> {
1076+
assertThat(collectedRows.size()).isEqualTo(4);
1077+
int intElement = 0;
1078+
for (Row row : collectedRows) {
1079+
intElement++;
1080+
assertThat(row.getArity()).isEqualTo(rowArity);
1081+
// "id" and "id2" columns should be different for every row.
1082+
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
1083+
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
1084+
assertThat(row.getField("uuid")).isNull();
1085+
assertThat(row.getField("isActive")).isNull();
1086+
assertThat(row.getField("balance")).isNull();
1087+
// metadata
1088+
assertThat(row.getField("errStr")).isNull();
1089+
assertThat(row.getField("headers")).isNotNull();
1090+
assertThat(row.getField("statusCode")).isEqualTo(200);
1091+
assertEquals(row.getField("completionState"), HttpCompletionState.SUCCESS.name());
1092+
}
1093+
}
1094+
);
1095+
}
1096+
10661097
private void assertEnrichedRowsDeserException(Collection<Row> collectedRows ) {
10671098

10681099
final int rowArity = 10;
@@ -1181,7 +1212,8 @@ private void setUpServerBodyStub(
11811212
private void setUpServerBodyStub(
11821213
String methodName,
11831214
WireMockServer wireMockServer,
1184-
List<StringValuePattern> matchingJsonPaths, boolean isDeserErr) {
1215+
List<StringValuePattern> matchingJsonPaths,
1216+
boolean isDeserErr) {
11851217
setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, null, isDeserErr);
11861218
}
11871219

@@ -1235,7 +1267,6 @@ private void setUpServerBodyStub(
12351267
wireMockServer.addStubMapping(stubMapping);
12361268
}
12371269

1238-
// Prototype parameterizedTest
12391270
@ParameterizedTest
12401271
@MethodSource("testSpecProvider")
12411272
void testHttpLookupJoinParameterized(TestSpec spec) throws Exception {
@@ -1316,16 +1347,19 @@ static Collection<TestSpec> testSpecProvider() {
13161347
for (String method : Arrays.asList("GET", "POST", "PUT")) {
13171348
for (boolean asyncFlag : Arrays.asList(false, true)) {
13181349
for (boolean continueOnError : Arrays.asList(false, true)) {
1319-
specs.add(TestSpec.builder()
1320-
.testName("HTTP Lookup Join With Metadata Deserialization Error")
1321-
.methodName(method)
1322-
.useMetadata(true)
1323-
.maxRows(4)
1324-
.useAsync(asyncFlag)
1325-
.deserError(true)
1326-
.continueOnError(continueOnError)
1327-
.build()
1328-
);
1350+
for (boolean ignoreParseErrors : Arrays.asList(false, true)) {
1351+
specs.add(TestSpec.builder()
1352+
.testName("HTTP Lookup Join With Metadata Deserialization Error")
1353+
.methodName(method)
1354+
.useMetadata(true)
1355+
.maxRows(4)
1356+
.useAsync(asyncFlag)
1357+
.deserError(true)
1358+
.ignoreParseErrors(ignoreParseErrors)
1359+
.continueOnError(continueOnError)
1360+
.build()
1361+
);
1362+
}
13291363
}
13301364
}
13311365
}
@@ -1367,6 +1401,7 @@ private static class TestSpec {
13671401
final int maxRows;
13681402
final boolean useAsync;
13691403
final boolean continueOnError;
1404+
final boolean ignoreParseErrors;
13701405

13711406
@Override
13721407
public String toString() {
@@ -1447,7 +1482,9 @@ private String createLookupTableSql(TestSpec spec) {
14471482
sql.append(") WITH (")
14481483
.append("'format' = 'json',")
14491484
.append("'connector' = 'rest-lookup',");
1450-
1485+
if (spec.ignoreParseErrors) {
1486+
sql.append("'json.ignore-parse-errors' = 'true',");
1487+
}
14511488
if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) {
14521489
sql.append("'lookup-method' = '").append(spec.methodName).append("',");
14531490
}

0 commit comments

Comments
 (0)