Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Table of Contents
* [Configuration Options](#configuration-options)
* [Client Configuration](#client-configuration)
* [Sink Configuration](#sink-configuration)
* [Sink Metrics](#sink-metrics)
* [Limitations](#limitations)
* [Contributing](#contributing)

Expand Down Expand Up @@ -200,11 +201,31 @@ Our Sink is built on top of Flink’s `AsyncSinkBase`
| maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A |
| maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A |

## Limitations
### Sink Metrics

Our Sink exposes additional metrics on top of Flink's existing metrics:

| Metric | Description | Type | Status |
|--------|-------------|------|--------|
| numBytesSend | Total number of bytes sent to ClickHouse | Counter | ✅ |
| numRecordSend | Total number of records sent to ClickHouse | Counter | ✅ |
| numRequestSubmitted | Total number of requests sent (actual number of flushes performed) | Counter | ✅ |
| numOfDroppedBatches | Total number of batches dropped due to non-retryable failures | Counter | ✅ |
| numOfDroppedRecords | Total number of records dropped due to non-retryable failures | Counter | ✅ |
| totalBatchRetries | Total number of batch retries due to retryable failures | Counter | ✅ |
| writeLatencyHistogram | Histogram of write latency distribution | Histogram | ✅ |
| writeFailureLatencyHistogram | Histogram of write failure latency distribution | Histogram | ✅ |
| triggeredByMaxBatchSizeCounter | Sink flushes triggered by reaching `maxBatchSize` | Counter | ✅ |
| triggeredByMaxBatchSizeInBytesCounter | Sink flushes triggered by reaching `maxBatchSizeInBytes` | Counter | ✅ |
| triggeredByMaxTimeInBufferMSCounter | Sink flushes triggered by reaching `maxTimeInBufferMS` | Counter | ✅ |
| actualRecordsPerBatchHistogram | Histogram of actual batch size distribution | Histogram | ✅ |
| actualBytesPerBatchHistogram | Histogram of actual bytes per batch distribution | Histogram | ✅ |
| actualTimeInBufferHistogram | Histogram of actual time in buffer before flush distribution | Histogram | ❌ |

## Limitations

* Currently the sink does not support exactly-once semantics


## Compatibility

- All projects in this repo are tested with all [active LTS versions](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) of ClickHouse.
Expand Down
5 changes: 3 additions & 2 deletions flink-connector-clickhouse-1.17/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ val flinkVersion = System.getenv("FLINK_VERSION") ?: "1.17.2"
extra.apply {
set("flinkVersion", flinkVersion)
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
set("testContainersVersion", "2.0.2")
set("testContainersClickHouseVersion", "1.21.3")
set("byteBuddyVersion", "1.17.5")
}

Expand Down Expand Up @@ -74,7 +75,7 @@ dependencies {
testImplementation("org.apache.flink:flink-test-utils:${project.extra["flinkVersion"]}")
//
testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}")
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}")
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersClickHouseVersion"]}")
testImplementation("org.scalatest:scalatest_2.13:3.2.19")
testRuntimeOnly("org.scalatestplus:junit-4-13_2.13:3.2.18.0")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
import org.apache.flink.connector.clickhouse.exception.RetriableException;
import org.apache.flink.connector.clickhouse.sink.writer.ExtendedAsyncSinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,7 +29,7 @@
import java.util.function.Consumer;


public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, ClickHousePayload> {
public class ClickHouseAsyncWriter<InputT> extends ExtendedAsyncSinkWriter<InputT, ClickHousePayload> {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class);
private static final int DEFAULT_MAX_RETRIES = 3;

Expand All @@ -40,6 +43,9 @@ public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, Click
private final Counter numOfDroppedBatchesCounter;
private final Counter numOfDroppedRecordsCounter;
private final Counter totalBatchRetriesCounter;
private final Histogram writeLatencyHistogram;
private final Histogram writeFailureLatencyHistogram;


public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
Sink.InitContext context,
Expand Down Expand Up @@ -74,6 +80,8 @@ public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> element
this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches");
this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords");
this.totalBatchRetriesCounter = metricGroup.counter("totalBatchRetries");
this.writeLatencyHistogram = metricGroup.histogram("writeLatencyHistogram", new DescriptiveStatisticsHistogram(1000));
this.writeFailureLatencyHistogram = metricGroup.histogram("writeFailureLatencyHistogram", new DescriptiveStatisticsHistogram(1000));
}


Expand Down Expand Up @@ -123,6 +131,7 @@ protected void submitRequestEntries(List<ClickHousePayload> requestEntries, Cons
throw new RuntimeException("ClickHouseFormat was not set ");
}
}
long writeStartTime = System.currentTimeMillis();
try {
CompletableFuture<InsertResponse> response = chClient.insert(tableName, out -> {
for (ClickHousePayload requestEntry : requestEntries) {
Expand All @@ -140,9 +149,9 @@ protected void submitRequestEntries(List<ClickHousePayload> requestEntries, Cons
}, format, new InsertSettings().setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true"));
response.whenComplete((insertResponse, throwable) -> {
if (throwable != null) {
handleFailedRequest(requestEntries, requestToRetry, throwable);
handleFailedRequest(requestEntries, requestToRetry, throwable, writeStartTime);
} else {
handleSuccessfulRequest(requestToRetry, insertResponse);
handleSuccessfulRequest(requestToRetry, insertResponse, writeStartTime);
}
});
} catch (Exception e) {
Expand All @@ -158,21 +167,26 @@ protected long getSizeInBytes(ClickHousePayload clickHousePayload) {


private void handleSuccessfulRequest(
Consumer<List<ClickHousePayload>> requestToRetry, InsertResponse response) {
LOG.info("Successfully completed submitting request. Response [written rows {}, time took to insert {}, written bytes {}, query_id {}]",
Consumer<List<ClickHousePayload>> requestToRetry, InsertResponse response, long writeStartTime) {
long writeEndTime = System.currentTimeMillis();
this.writeLatencyHistogram.update(writeEndTime - writeStartTime);
LOG.info("Successfully completed submitting request. Response [written rows {}, time took to insert {}, written bytes {}, query_id {}, write latency {} ms.]",
response.getWrittenRows(),
response.getServerTime(),
response.getWrittenBytes(),
response.getQueryId()
response.getQueryId(),
writeEndTime - writeStartTime
);
requestToRetry.accept(Collections.emptyList());
}

private void handleFailedRequest(
List<ClickHousePayload> requestEntries,
Consumer<List<ClickHousePayload>> requestToRetry,
Throwable error) {
Throwable error, long writeStartTime) {
// TODO: extract from error if we can retry
long writeEndTime = System.currentTimeMillis();
this.writeFailureLatencyHistogram.update(writeEndTime - writeStartTime);
try {
Utils.handleException(error);
} catch (RetriableException e) {
Expand Down
Loading
Loading