Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
3c8880c
Separate CLI and application (#207)
philipp94831 Jul 19, 2024
632b7fe
Remove guava dependency (#237)
philipp94831 Jul 23, 2024
be8ffc8
Use KAFKA_ as prefix for environment Kafka config (#209)
philipp94831 Jul 23, 2024
8dc843c
Add HostInfo to ImprovedStreamsConfig (#230)
Jul 23, 2024
9b67bc7
Remove log4j dependency and debug parameter (#238)
philipp94831 Jul 24, 2024
cc19d5e
Add default serialization config to apps (#239)
philipp94831 Jul 25, 2024
1a4b565
Rename streams section to kafka in Helm charts (#241)
philipp94831 Jul 25, 2024
cb4fd69
Remove schema registry support from core module
philipp94831 Jul 26, 2024
9529938
Remove schema registry support from core module
philipp94831 Jul 26, 2024
030985e
Remove schema registry support from core module
philipp94831 Jul 26, 2024
cd0a934
Remove schema registry support from core module
philipp94831 Jul 26, 2024
20cb804
Remove schema registry support from core module
philipp94831 Jul 26, 2024
7619a0c
Remove schema registry support from core module
philipp94831 Jul 26, 2024
76b89a4
Remove schema registry CLI param
philipp94831 Jul 26, 2024
d6c5439
Update
philipp94831 Jul 26, 2024
88c1b8c
Merge branch 'refs/heads/feature/remove-schema-registry-2' into featu…
philipp94831 Jul 26, 2024
6e68149
Rename to labeled
philipp94831 Jul 26, 2024
3a4af0b
Rename extra topics to labeled topics (#240)
philipp94831 Jul 26, 2024
ecf41c8
Rename `--brokers` to `--bootstrap-servers` (#242)
philipp94831 Jul 26, 2024
97a3803
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 26, 2024
a043cf0
Update
philipp94831 Jul 26, 2024
878e22b
Add CLI parameter to specify streams application ID (#243)
philipp94831 Jul 29, 2024
c347337
Remove unnecessary schema registry configurations in tests (#248)
philipp94831 Jul 29, 2024
957e4c1
Replace Guava usages (#246)
philipp94831 Jul 29, 2024
a88a343
Rename TestTopologyFactory (#249)
philipp94831 Jul 29, 2024
aaf6a68
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 29, 2024
caaf0a0
Update
philipp94831 Jul 29, 2024
d1bfa89
Make CleanUpRunner closeable (#247)
philipp94831 Jul 29, 2024
363e9c8
Merge remote-tracking branch 'origin/v3' into feature/remove-schema-r…
philipp94831 Jul 29, 2024
e1ad94d
Update
philipp94831 Jul 29, 2024
6c91a5e
Update
philipp94831 Jul 29, 2024
3641de5
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
e007c1d
Merge branch 'refs/heads/feature/schema-registry-topic-hook' into fea…
philipp94831 Jul 29, 2024
a180ea4
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
89c9315
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
362093c
Merge branch 'refs/heads/feature/schema-registry-topic-hook' into fea…
philipp94831 Jul 29, 2024
7bb9bf5
Use topic hook for schema registry reset
philipp94831 Jul 29, 2024
8b34869
Rename Streams section in Helm chart docs (#252)
philipp94831 Aug 5, 2024
935b61d
Fix Sonarqube issues (#253)
philipp94831 Aug 7, 2024
6c1db53
Validate autoscaling mandatory fields when it is enabled (#254)
Aug 8, 2024
8a75197
Validate persistence mandatory chart values (persistence.size) (#255)
Aug 8, 2024
49e7122
Add hook to prepare running of app (#256)
philipp94831 Aug 20, 2024
a41aa8a
Pre-bump version to 3.0.1-SNAPSHOT (#257)
philipp94831 Aug 20, 2024
4679459
Merge branch 'v3' into feature/schema-registry-topic-hook
philipp94831 Jul 10, 2025
9836970
Merge branch 'tmp/v3-master' into feature/schema-registry-topic-hook
philipp94831 Jul 10, 2025
bc239cf
Merge remote-tracking branch 'origin/master' into feature/schema-regi…
philipp94831 Jul 10, 2025
822940c
Update
philipp94831 Jul 10, 2025
63d597a
Update
philipp94831 Jul 10, 2025
74176fd
Merge remote-tracking branch 'origin/master' into feature/schema-regi…
philipp94831 Jul 10, 2025
5317718
Update
philipp94831 Jul 10, 2025
4ed5e24
Add doc
philipp94831 Jul 10, 2025
437052b
Merge branch 'v3' into feature/remove-schema-registry
philipp94831 Jul 10, 2025
5536027
Merge branch 'tmp/v3-master' into feature/remove-schema-registry
philipp94831 Jul 10, 2025
260a3eb
Merge branch 'feature/schema-registry-topic-hook' into feature/remove…
philipp94831 Jul 10, 2025
f505bbe
Update
philipp94831 Jul 10, 2025
9d96097
Update
philipp94831 Jul 11, 2025
e517c10
Update
philipp94831 Jul 11, 2025
ac75f85
Update
philipp94831 Jul 11, 2025
c439416
Update
philipp94831 Jul 11, 2025
7ecb61c
Update
philipp94831 Jul 11, 2025
90eaecd
Merge branch 'feature/schema-registry-topic-hook' into feature/remove…
philipp94831 Jul 11, 2025
0bb1749
Update
philipp94831 Jul 11, 2025
90cb255
Merge remote-tracking branch 'origin/master' into feature/schema-regi…
philipp94831 Jul 11, 2025
a2fc435
Merge branch 'feature/schema-registry-topic-hook' into feature/remove…
philipp94831 Jul 11, 2025
b2e123a
Reorganize tests
philipp94831 Jul 11, 2025
f27234f
Reorganize tests
philipp94831 Jul 11, 2025
ebf3611
Reorganize tests
philipp94831 Jul 11, 2025
da32a39
Reorganize tests
philipp94831 Jul 11, 2025
4490c20
Reorganize tests
philipp94831 Jul 11, 2025
649a43a
Reduce avro usage in tests
philipp94831 Jul 11, 2025
ee3acef
Reduce avro usage in tests
philipp94831 Jul 11, 2025
b3979bf
Reduce avro usage in tests
philipp94831 Jul 11, 2025
c59c9cb
Merge branch 'feature/reduce-avro-usage' into feature/remove-schema-r…
philipp94831 Jul 11, 2025
c672b2b
Merge remote-tracking branch 'origin/master' into feature/schema-regi…
philipp94831 Jul 16, 2025
09ebab4
Merge remote-tracking branch 'origin/master' into feature/remove-sche…
philipp94831 Jul 16, 2025
bb28e8c
Merge branch 'feature/schema-registry-topic-hook' into feature/remove…
philipp94831 Jul 16, 2025
5cbf450
Merge branch 'master' into feature/schema-registry-topic-hook
philipp94831 Jul 18, 2025
8cfc373
Merge remote-tracking branch 'origin/master' into feature/schema-regi…
philipp94831 Jul 18, 2025
a982d29
Merge remote-tracking branch 'origin/feature/schema-registry-topic-ho…
philipp94831 Jul 18, 2025
446b196
Merge branch 'feature/schema-registry-topic-hook' into feature/remove…
philipp94831 Jul 18, 2025
441fa81
Merge remote-tracking branch 'origin/master' into feature/remove-sche…
philipp94831 Jul 21, 2025
4398beb
Merge remote-tracking branch 'origin/master' into feature/remove-sche…
philipp94831 Jul 22, 2025
eaba52b
Merge remote-tracking branch 'origin/master' into feature/remove-sche…
philipp94831 Jul 22, 2025
1fc8c8a
Update
philipp94831 Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ The following configuration options are available:

- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--input-topics`: List of input topics (comma-separated)
Expand Down Expand Up @@ -195,8 +193,6 @@ The following configuration options are available:

- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)

- `--output-topic`: The output topic
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.kafka.outputTopic | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ files: {}

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
|-----------------------------|--------------------------------------------------------------------------------------------------------------|---------|
| `kafka.bootstrapServers` | Comma separated list of Kafka bootstrap servers to connect to. | |
| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` |
| `kafka.outputTopic` | Output topic for your producer application. | |
| `kafka.labeledOutputTopics` | Map of additional labeled output topics if you need to specify multiple topics with different message types. | `{}` |
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app/templates/_pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.kafka.outputTopic | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/producer-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ resources:

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.kafka.inputTopics | join "," | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ files: {}

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `kafka.bootstrapServers` | Comma separated list of Kafka bootstrap servers to connect to. | |
| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `kafka.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` |
| `kafka.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` |
| `kafka.inputTopics` | List of input topics for your streams application. | `[]` |
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if and (hasKey .Values.kafka "inputTopics") (.Values.kafka.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.kafka.inputTopics | join "," | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ resources:

kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
staticMembership: false
config: {}
# max.poll.records: 500
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ include(
":streams-bootstrap-core",
":streams-bootstrap-test",
":streams-bootstrap-large-messages",
":streams-bootstrap-schema-registry",
":streams-bootstrap-cli",
":streams-bootstrap-cli-test",
":streams-bootstrap-bom",
Expand Down
1 change: 1 addition & 0 deletions streams-bootstrap-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
api(project(":streams-bootstrap-core"))
api(project(":streams-bootstrap-cli"))
api(project(":streams-bootstrap-large-messages"))
api(project(":streams-bootstrap-schema-registry"))
api(project(":streams-bootstrap-test"))
api(project(":streams-bootstrap-cli-test"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,16 @@ public KafkaTestClient newTestClient() {
*/
public void configure(final KafkaApplication<?, ?, ?, ?, ?, ?, ?, ?> app) {
app.setBootstrapServers(this.bootstrapServers);
final Map<String, String> mergedConfig = merge(app.getKafkaConfig(), this.kafkaConfig);
final Map<String, String> localConfig = this.getLocalConfig();
final Map<String, String> mergedConfig = merge(app.getKafkaConfig(), localConfig);
app.setKafkaConfig(mergedConfig);
if (this.schemaRegistry != null) {
app.setSchemaRegistryUrl(this.schemaRegistry.getSchemaRegistryUrl());
}

private Map<String, String> getLocalConfig() {
if (this.schemaRegistry == null) {
return this.kafkaConfig;
}
return merge(this.kafkaConfig, this.schemaRegistry.getKafkaProperties());
}

private void prepareExecution(final KafkaApplication<?, ?, ?, ?, ?, ?, ?, ?> app) {
Expand All @@ -248,10 +253,8 @@ private void prepareExecution(final KafkaApplication<?, ?, ?, ?, ?, ?, ?, ?> app
}

private RuntimeConfiguration createRuntimeConfiguration() {
final RuntimeConfiguration configuration = RuntimeConfiguration.create(this.bootstrapServers)
.with(this.kafkaConfig);
return this.schemaRegistry == null ? configuration
: configuration.withSchemaRegistryUrl(this.schemaRegistry.getSchemaRegistryUrl());
return RuntimeConfiguration.create(this.bootstrapServers)
.with(this.getLocalConfig());
}

private String[] setupArgs(final String[] args, final Iterable<String> command) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
* <li>{@link #bootstrapServers}</li>
* <li>{@link #outputTopic}</li>
* <li>{@link #labeledOutputTopics}</li>
* <li>{@link #schemaRegistryUrl}</li>
* <li>{@link #kafkaConfig}</li>
* </ul>
* To implement your Kafka application inherit from this class and add your custom options. Run it by creating an
Expand Down Expand Up @@ -90,8 +89,6 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
@CommandLine.Option(names = {"--bootstrap-servers", "--bootstrap-server"}, required = true,
description = "Kafka bootstrap servers to connect to")
private String bootstrapServers;
@CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry")
private String schemaRegistryUrl;
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
private Map<String, String> kafkaConfig = emptyMap();

Expand Down Expand Up @@ -191,10 +188,8 @@ public void run() {
}

public RuntimeConfiguration getRuntimeConfiguration() {
final RuntimeConfiguration configuration = RuntimeConfiguration.create(this.bootstrapServers)
return RuntimeConfiguration.create(this.bootstrapServers)
.with(this.kafkaConfig);
return this.schemaRegistryUrl == null ? configuration
: configuration.withSchemaRegistryUrl(this.schemaRegistryUrl);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ public void run() {
}) {
app.startApplicationWithoutExit(new String[]{
"--bootstrap-server", "bootstrap-servers",
"--schema-registry-url", "schema-registry",
"--input-topics", "input1,input2",
"--labeled-input-topics", "label1=input3,label2=input4;input5",
"--input-pattern", ".*",
Expand All @@ -375,7 +374,6 @@ public void run() {
"--kafka-config", "foo=1,bar=2",
});
assertThat(app.getBootstrapServers()).isEqualTo("bootstrap-servers");
assertThat(app.getSchemaRegistryUrl()).isEqualTo("schema-registry");
assertThat(app.getInputTopics()).containsExactly("input1", "input2");
assertThat(app.getLabeledInputTopics())
.hasSize(2)
Expand Down
9 changes: 1 addition & 8 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ dependencies {

api(libs.kafka.streams)
api(libs.kafka.clients)
implementation(libs.kafka.schema.serializer) {
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
exclude(group = "org.slf4j", module = "slf4j-api") // Conflict with 2.x when used as dependency
}
api(libs.kafka.schema.registry.client) {
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
exclude(group = "org.slf4j", module = "slf4j-api") // Conflict with 2.x when used as dependency
}
implementation(libs.slf4j)
implementation(libs.jool)
implementation(libs.resilience4j.retry)
Expand All @@ -34,6 +26,7 @@ dependencies {
testImplementation(libs.mockito.junit)

testImplementation(testFixtures(project(":streams-bootstrap-test")))
testImplementation(project(":streams-bootstrap-schema-registry"))
testImplementation(libs.kafka.streams.avro.serde) {
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
Expand All @@ -40,13 +39,12 @@
import org.apache.kafka.streams.StreamsConfig;

/**
* Runtime configuration to connect to Kafka infrastructure, e.g., bootstrap servers and schema registry.
* Runtime configuration to connect to Kafka infrastructure, e.g., bootstrap servers.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class RuntimeConfiguration {
private static final Set<String> PROVIDED_PROPERTIES = Set.of(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
);
private final @NonNull Map<String, Object> properties;

Expand All @@ -71,16 +69,6 @@ public RuntimeConfiguration with(final Map<String, ?> newProperties) {
return this.withInternal(newProperties);
}

/**
* Configure a schema registry for (de-)serialization.
*
* @param schemaRegistryUrl schema registry url
* @return a copy of this runtime configuration with configured schema registry
*/
public RuntimeConfiguration withSchemaRegistryUrl(final String schemaRegistryUrl) {
return this.withInternal(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl));
}

/**
* Configure {@link StreamsConfig#STATE_DIR_CONFIG} for Kafka Streams. Useful for testing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,8 @@

package com.bakdata.kafka.admin;

import static com.bakdata.kafka.SchemaRegistryAppUtils.createSchemaRegistryClient;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.NonNull;
Expand All @@ -49,7 +43,6 @@ public final class AdminClientX implements AutoCloseable {

private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L);
private final @NonNull Admin adminClient;
private final SchemaRegistryClient schemaRegistryClient;
private final @NonNull Timeout timeout;

/**
Expand All @@ -74,10 +67,8 @@ public static AdminClientX create(@NonNull final Map<String, Object> properties,
String.format("%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
}
final Admin adminClient = AdminClient.create(properties);
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(properties).orElse(null);
return builder()
.adminClient(adminClient)
.schemaRegistryClient(schemaRegistryClient)
.timeout(new Timeout(timeout))
.build();
}
Expand All @@ -91,15 +82,6 @@ public Admin admin() {
return new PooledAdmin(this.adminClient);
}

/**
* Create a {@link SchemaRegistryClient} if schema registry is configured.
* @return schema registry client
*/
public Optional<SchemaRegistryClient> schemaRegistry() {
return Optional.ofNullable(this.schemaRegistryClient)
.map(PooledSchemaRegistryClient::new);
}

/**
* Create a {@link TopicsClient} to perform topic-related administrative actions.
* @return topic client
Expand All @@ -119,13 +101,6 @@ public ConsumerGroupsClient consumerGroups() {
@Override
public void close() {
this.adminClient.close();
if (this.schemaRegistryClient != null) {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}

@RequiredArgsConstructor
Expand All @@ -143,15 +118,4 @@ public void close() {
// do nothing
}
}

@RequiredArgsConstructor
private static class PooledSchemaRegistryClient implements SchemaRegistryClient {
@Delegate(excludes = AutoCloseable.class)
private final @NonNull SchemaRegistryClient schemaRegistryClient;

@Override
public void close() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka.producer;

import com.bakdata.kafka.CleanUpRunner;
import com.bakdata.kafka.SchemaRegistryAppUtils;
import com.bakdata.kafka.admin.AdminClientX;
import java.util.Map;
import lombok.AccessLevel;
Expand Down Expand Up @@ -68,8 +67,6 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics,
@NonNull final Map<String, Object> kafkaProperties,
@NonNull final ProducerCleanUpConfiguration configuration) {
SchemaRegistryAppUtils.createTopicHook(kafkaProperties)
.ifPresent(configuration::registerTopicHook);
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.bakdata.kafka.CleanUpException;
import com.bakdata.kafka.CleanUpRunner;
import com.bakdata.kafka.SchemaRegistryAppUtils;
import com.bakdata.kafka.admin.AdminClientX;
import com.bakdata.kafka.admin.ConsumerGroupsClient;
import com.bakdata.kafka.util.TopologyInformation;
Expand Down Expand Up @@ -85,8 +84,6 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) {
final StreamsConfigX config = new StreamsConfigX(streamsConfig);
final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId());
SchemaRegistryAppUtils.createTopicHook(config.getKafkaProperties())
.ifPresent(configuration::registerTopicHook);
return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration);
}

Expand Down
Loading
Loading