diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dd3674c..4b6d5799 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. ### Changed - Refactor: move server configuration properties from the command line to configuration files. ([#911]). +- Add support for ZooKeeper to KRaft migration ([#923]). - Bump testing-tools to `0.3.0-stackable0.0.0-dev` ([#925]). ### Removed @@ -26,6 +27,7 @@ All notable changes to this project will be documented in this file. [#911]: https://github.com/stackabletech/kafka-operator/pull/911 [#914]: https://github.com/stackabletech/kafka-operator/pull/914 [#915]: https://github.com/stackabletech/kafka-operator/pull/915 +[#923]: https://github.com/stackabletech/kafka-operator/pull/923 [#925]: https://github.com/stackabletech/kafka-operator/pull/925 [#927]: https://github.com/stackabletech/kafka-operator/pull/927 [#928]: https://github.com/stackabletech/kafka-operator/pull/928 diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 0e975901..cc471755 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -787,6 +787,61 @@ spec: - configMapName type: object type: object + brokerIdPodConfigMapName: + description: |- + Enable users to manually assign Kafka broker ids. + + Name of a ConfigMap containing a mapping of broker IDs to pod names. + The ConfigMap must contain a key for every broker pod in the cluster with the following format: + `: ` + + Example: + ``` + --- + apiVersion: v1 + kind: ConfigMap + metadata: + name: brokeridmapping + data: + simple-kafka-broker-default-0: "2000" + simple-kafka-broker-default-1: "2001" + simple-kafka-broker-default-2: "2002" + ``` + This is necessary when migrating from ZooKeeper to Kraft mode to retain existing broker IDs + because previously broker ids were generated by Kafka and not the operator. + nullable: true + type: string + metadataManager: + description: |- + Metadata manager to use for the Kafka cluster. + + IMPORTANT: This property will be removed as soon as Kafka 3.x support is dropped. + + Possible values are `zookeeper` and `kraft`. + + If not set, defaults to: + + - `zookeeper` for Kafka versions below `4.0.0`. + - `kraft` for Kafka versions `4.0.0` and higher. + + Using `zookeeper` for Kafka versions `4.0.0` and higher is not supported. + + When set to `kraft`, the operator will perform the following actions: + + * Generate the Kafka cluster id. + * Assign broker roles and configure controller quorum voters in the `broker.properties` files. + * Format storage before (re)starting Kafka brokers. + * Remove ZooKeeper related configuration options from the `broker.properties` files. + + Some of them cannot be performed with the ZooKeeper mode. + + This property is also useful when migrating from ZooKeeper to Kraft mode because it permits the operator + to reconcile controllers while still using ZooKeeper for brokers. + enum: + - zookeeper + - kraft + nullable: true + type: string tls: default: internalSecretClass: tls @@ -830,7 +885,7 @@ spec: Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. Please use the 'controller' role instead. nullable: true type: string diff --git a/docs/modules/kafka/examples/kraft_migration/01-setup.yaml b/docs/modules/kafka/examples/kraft_migration/01-setup.yaml new file mode 100644 index 00000000..bb765307 --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/01-setup.yaml @@ -0,0 +1,98 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: kraft-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: simple-zk + namespace: kraft-migration +spec: + image: + productVersion: 3.9.4 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: simple-kafka-znode + namespace: kraft-migration +spec: + clusterRef: + name: simple-zk +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-kafka-internal-tls-ca + namespace: kraft-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: kafka-client-auth-tls +spec: + provider: + tls: + clientCertSecretClass: kafka-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-ca + namespace: kraft-migration + autoGenerate: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: broker-ids + namespace: kraft-migration +data: + simple-kafka-broker-default-0: "2000" + simple-kafka-broker-default-1: "2001" + simple-kafka-broker-default-2: "2002" +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + brokerIdPodConfigMapName: broker-ids + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + roleGroups: + default: + replicas: 3 diff --git a/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml b/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml new file mode 100644 index 00000000..5fce10af --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml @@ -0,0 +1,34 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + roleGroups: + default: + replicas: 3 + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml b/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml new file mode 100644 index 00000000..87304945 --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + inter.broker.protocol.version: "3.9" # - Latest value known to Kafka 3.9.1 + zookeeper.metadata.migration.enable: "true" # - Enable migration mode so the broker can participate in metadata migration. + controller.listener.names: "CONTROLLER" + controller.quorum.bootstrap.servers: "simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml b/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml new file mode 100644 index 00000000..5e4a61cd --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + controller.listener.names: "CONTROLLER" + controller.quorum.bootstrap.servers: "simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093" + process.roles: "broker" + node.id: "${env:REPLICA_ID}" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml b/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml new file mode 100644 index 00000000..16921d1f --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml @@ -0,0 +1,33 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: kraft + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + controller.listener.names: "CONTROLLER" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "MyCya7hbTD-Hay8PgCsCYA" diff --git a/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml b/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml new file mode 100644 index 00000000..4d95005a --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml @@ -0,0 +1,90 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: mm-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: zookeeper + namespace: mm-migration +spec: + image: + productVersion: 3.9.4 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: source-znode + namespace: mm-migration +spec: + clusterRef: + name: zookeeper +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: source-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-source-internal-tls-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: source-client-auth +spec: + provider: + tls: + clientCertSecretClass: source-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: source-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-source-client-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: source + namespace: mm-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: source-client-auth + tls: + internalSecretClass: source-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: source-znode + brokers: + roleGroups: + default: + replicas: 1 + configOverrides: + broker.properties: + offsets.topic.replication.factor: "1" # https://github.com/stackabletech/kafka-operator/issues/587 diff --git a/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml b/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml new file mode 100644 index 00000000..a69421fa --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: target-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-target-internal-tls-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: target-client-auth +spec: + provider: + tls: + clientCertSecretClass: target-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: target-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-target-client-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: target + namespace: mm-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: kraft + authentication: + - authenticationClass: target-client-auth + tls: + internalSecretClass: target-internal-tls + serverSecretClass: tls + brokers: + roleGroups: + default: + replicas: 1 + configOverrides: + broker.properties: + offsets.topic.replication.factor: "1" # https://github.com/stackabletech/kafka-operator/issues/587 + controllers: + roleGroups: + default: + replicas: 1 diff --git a/docs/modules/kafka/examples/mirror_maker/README.md b/docs/modules/kafka/examples/mirror_maker/README.md new file mode 100644 index 00000000..3cb4adf0 --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/README.md @@ -0,0 +1,50 @@ +## Description + +This is an internal protocol of what I've done to get MM2 running to help any future efforts. +There is no user facing documentation for it. + +### Setup + +kubectl create --save-config -f docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml +kubectl create --save-config -f docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml + +kubectl cp -n mm-migration -c kafka target-broker-default-0:/stackable/tls-kafka-server/keystore.p12 docs/modules/kafka/examples/mirror_maker/keystore.p12 +kubectl cp -n mm-migration -c kafka target-broker-default-0:/stackable/tls-kafka-server/truststore.p12 docs/modules/kafka/examples/mirror_maker/truststore.p12 + +kubectl cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/truststore.p12 source-broker-default-0:/stackable/truststore.p12 +kubectl cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/keystore.p12 source-broker-default-0:/stackable/keystore.p12 + +kubectl cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/mm.properties source-broker-default-0:/stackable/mm.properties + +### Create a topic and publish some data + +/stackable/kafka/bin/kafka-topics.sh --create --topic test --partitions 1 --bootstrap-server source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --command-config /stackable/config/client.properties + +/stackable/kafka/bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --payload-monotonic --throughput 1 --num-records 100 --producer.config /stackable/config/client.properties --topic test + +/stackable/kafka/bin/kafka-console-consumer.sh --bootstrap-server source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --consumer.config /stackable/config/client.properties --topic test --offset earliest --partition 0 --timeout-ms 10000 + +### Run MirrorMaker + +EXTRA_ARGS="" /stackable/kafka/bin/connect-mirror-maker.sh /stackable/mm.properties + +### Verify the topic is mirrored + +/stackable/kafka/bin/kafka-topics.sh --list --bootstrap-server target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --command-config /stackable/config/client.properties + +/stackable/kafka/bin/kafka-console-consumer.sh --bootstrap-server target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --consumer.config /stackable/config/client.properties --topic source.test --offset earliest --partition 0 --timeout-ms 10000 + +### Cleanup + +kubectl delete -n mm-migration kafkaclusters source +kubectl delete -n mm-migration kafkaclusters target +kubectl delete -n mm-migration zookeeperznodes source-znode +kubectl delete -n mm-migration zookeeperclusters zookeeper +kubectl delete -n mm-migration secretclasses source-internal-tls +kubectl delete -n mm-migration secretclasses source-client-auth-secret +kubectl delete -n mm-migration secretclasses target-internal-tls +kubectl delete -n mm-migration secretclasses target-client-auth-secret +kubectl delete -n mm-migration authenticationclasses target-client-auth +kubectl delete -n mm-migration authenticationclasses source-client-auth +kubectl delete -n mm-migration persistentvolumeclaims --all +kubectl delete ns mm-migration diff --git a/docs/modules/kafka/examples/mirror_maker/mm.properties b/docs/modules/kafka/examples/mirror_maker/mm.properties new file mode 100644 index 00000000..b8360527 --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/mm.properties @@ -0,0 +1,38 @@ +# specify any number of cluster aliases +clusters = source, target + +# connection information for each cluster +# This is a comma separated host:port pairs for each cluster +# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts +source.bootstrap.servers = source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 +target.bootstrap.servers = target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 + +# enable and configure individual replication flows +source->target.enabled = true + +# regex which defines which topics gets replicated. For eg "foo-.*" +source->target.topics = test + +# Needed for mm2 internal topics if there is only one broker running per cluster +offset.storage.replication.factor=1 +config.storage.replication.factor=1 +status.storage.replication.factor=1 + +# SSL configuration +target.security.protocol=SSL +target.ssl.truststore.password= +target.ssl.truststore.location=/stackable/truststore.p12 +target.ssl.truststore.type=PKCS12 +#keystore location in case client.auth is set to required +target.ssl.keystore.password= +target.ssl.keystore.location=/stackable/keystore.p12 +target.ssl.keystore.type=PKCS12 + +source.security.protocol=SSL +source.ssl.truststore.password= +source.ssl.truststore.location=/stackable/tls-kafka-server/truststore.p12 +source.ssl.truststore.type=PKCS12 +#keystore location in case client.auth is set to required +source.ssl.keystore.password= +source.ssl.keystore.location=/stackable/tls-kafka-server/keystore.p12 +source.ssl.keystore.type=PKCS12 diff --git a/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc b/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc index ea5c4946..f444a411 100644 --- a/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc +++ b/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc @@ -83,7 +83,7 @@ KRaft mode requires major configuration changes compared to ZooKeeper: * `cluster-id`: This is set to the `metadata.name` of the KafkaCluster resource during initial formatting * `node.id`: This is a calculated integer, hashed from the `role` and `rolegroup` and added `replica` id. * `process.roles`: Will always only be `broker` or `controller`. Mixed `broker,controller` servers are not supported. -* The operator configures a static voter list containing the controller pods. Controllers are not dynamicaly managed. +* The operator configures a static voter list containing the controller pods. Controllers are not dynamically managed. == Known Issues @@ -110,3 +110,203 @@ The Stackable Kafka operator currently does not support the migration. The https://developers.redhat.com/articles/2024/11/27/dynamic-kafka-controller-quorum[Dynamic scaling] is only supported from Kafka version 3.9.0. If you are using older versions, automatic scaling may not work properly (e.g. adding or removing controller replicas). + +== Kraft migration guide + +The operator version `26.3.0` adds support for migrating Kafka clusters from ZooKeeper to KRaft mode. + +This guide describes the steps required to migrate an existing Kafka cluster managed by the Stackable Kafka operator from ZooKeeper to KRaft mode. + +NOTE: Before starting the migration we recommend to reduce producer/consumer operations to a minimum or even pause them completely if possible to reduce the risk of data loss during the migration. + +To make the migration step as clear as possible, we'll use a complete working example throughout this guide. +The example cluster will be kept minimal without any additional configuration. + +We'll use Kafka version `3.9.1` for this purpose. This is because this is the last version from the 3.x Kafka series that runs on ZooKeeper mode and is supported by the SDP. + +We'll also assign broker ids manually from the beginning to simplify this guide. In a real-workd scenario, you do not have this option at this step because your cluster is already running. +In a real world-scenario you'll have to collect these ids and configure manual assignment at the second step of the migration. + +We start by creating a dedicated namespace to work in and deploy the Kafka cluster including ZooKeeper and credentials. + +[source,yaml] +---- +include::example$kraft_migration/01-setup.yaml[] +---- + +Next, from one of the broker pods, we will create a topic called `kraft-migration-topic` with 3 partitions to verify the migration later. + +[source,bash] +---- +$ /stackable/kafka/bin/kafka-topics.sh \ +--create \ +--command-config /stackable/config/client.properties \ +--bootstrap-server simple-kafka-broker-default-0-listener-broker.kraft-migration.svc.cluster.local:9093 \ +--partitions 3 \ +--topic kraft-migration-topic +---- + +And - also from one of the broker pods - publish some test messages to it: + +[source,bash] +---- +$ /stackable/kafka/bin/kafka-producer-perf-test.sh \ +--producer.config /stackable/config/client.properties \ +--producer-props bootstrap.servers=simple-kafka-broker-default-0-listener-broker.kraft-migration.svc.cluster.local:9093 \ +--topic kraft-migration-topic \ +--payload-monotonic \ +--throughput 1 \ +--num-records 100 +---- + +We now have a working Kafka cluster with ZooKeeper and some test data. + +=== 1. Start Kraft controllers + +In this step we will perform the following actions: + +1. Retrieve the current `cluster.id` as generated by Kafka. +2. Retrieve and store the current broker ids. +3. Update the `KafkaCluster` resource to add `spec.controllers` property. +4. Configure the controllers to run in migration mode. +5. Apply the changes and wait for all cluster pods to become ready. + +We can obtain the current `cluster.id` either by inspecting the ZooKeeper data or from `meta.properties` file on one of the brokers. + +[source,bash] +---- +$ kubectl -n kraft-migration exec -c kafka simple-kafka-broker-default-0 -- cat /stackable/data/topicdata/meta.properties | grep cluster.id +cluster.id=MyCya7hbTD-Hay8PgCsCYA +---- + +We add this value to the `KAFKA_CLUSTER_ID` environment variable for both brokers and controllers. + +To be able to migrate the existing brokers, we need to preserve their broker ids. +Similarly to the cluster id, we can obtain the broker ids from the `meta.properties` file on each broker pod. + +[source,bash] +---- +$ kubectl -n kraft-migration exec -c kafka simple-kafka-broker-default-0 -- cat /stackable/data/topicdata/meta.properties | grep broker.id +broker.id=2000 +---- + +We then need to inform the operator to use these ids instead of generating new ones. +This is done by creating a configmap map containing the id mapping and pointing the `spec.clusterProperties.brokerIdPodConfigMapName` property of the `KafkaCluster` resource to it. + +These two properties must be preserverd for the rest of the migration process and the lifetime of the cluster. + +The complete example `KafkaCluster` resource after applying the required changes looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/02-start-controllers.yaml[] +---- + +We `kubectl apply` the updated resource and wait for brokers and controllers to become ready. + +=== 2. Migrate metadata + +In this step we will perform the following actions: + +1. Configure the controller quorum on the brokers. +2. Enable metadata migration mode on the brokers. +3. Apply the changes and restart the broker pods. + +To start the metadata migration, we need to add the `zookeeper.metadata.migration.enable: "true"` and controller quorum configuration to the broker configuration. + +For this step, the complete example `KafkaCluster` resource looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/03-migrate-metadata.yaml[] +---- + +The brokers are now restarted automatically due to configuration changes. + +Finally we check that metadata migration was successful: + +[source,bash] +---- +kubectl logs -n kraft-migration simple-kafka-controller-default-0 | grep -i 'completed migration' \ +|| kubectl logs -n kraft-migration simple-kafka-controller-default-1 | grep -i 'completed migration' \ +|| kubectl logs -n kraft-migration simple-kafka-controller-default-2 | grep -i 'completed migration' + +... +[2025-12-22 09:23:53,372] INFO [KRaftMigrationDriver id=2110489705] Completed migration of metadata from ZooKeeper to KRaft. 0 records were generated in 102 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}. The current metadata offset is now 280 with an epoch of 3. Saw 0 brokers in the migrated metadata []. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) +---- + +=== 3. Migrate brokers + + +NOTE: This is the last step before fully switching to KRaft mode. In case of unforeseen issues, it is the last step where we can roll back to ZooKeeper mode. + +In this step we will perform the following actions: + +1. Remove the migration properties from the previous step on the brokers. +2. Assign Kraft role properties to brokers. +3. Apply the changes and restart the broker pods. + +We need to preserve the quorum configuration added in the previous step. + +For this step, the complete example `KafkaCluster` resource looks as follows: + + +[source,yaml] +---- +include::example$kraft_migration/04-migrate-brokers.yaml[] +---- + +=== 4. Enable Kraft mode + +After this step, the cluster will be fully running in KRaft mode and it cannot be rolled back to ZooKeeper mode anymore. + +In this step we will perform the following actions: + +1. Put the cluster in Kraft mode by updating the `spec.clusterConfig.metadataManager` property. +2. Remove Kraft quorum configuration from the broker pods. +3. Remove the ZooKeeper migration flag from the controllers. +4. Apply the changes and restart all pods. + +We need to preserve the `KAFKA_CLUSTER_ID` environment variable for the rest of the lifetime of this cluster. + +The complete example `KafkaCluster` resource after applying the required changes looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/05-kraft-mode.yaml[] +---- + +Verify that the cluster is healthy and consumer/producer operations work as expected. + +We can consume the previously produced messages by running the command below on one of the broker pods: + +[source,bash] +---- +/stackable/kafka/bin/kafka-console-consumer.sh \ +--consumer.config /stackable/config/client.properties \ +--bootstrap-server simple-kafka-broker-default-0-listener-broker.kraft-migration.svc.cluster.local:9093 \ +--topic kraft-migration-topic \ +--offset earliest \ +--partition 0 \ +--timeout-ms 10000 +---- + +=== 5. Cleanup + +Before proceeding with this step please ensure that the Kafka cluster is fully operational in KRaft mode. + +In this step we remove the now unused ZooKeeper cluster and related resources. + +If the ZooKeeper cluster is also serving other use cases than Kafka you can skip this step. + +In our example we can remove the ZooKeeper cluster and the Znode resource as follows: + +[source,bash] +---- +kubectl delete -n kraft-migration zookeeperznodes simple-kafka-znode +kubectl delete -n kraft-migration zookeeperclusters simple-zk +---- + +=== 6. Next steps + +After successfully migrating to Kraft mode, consider updating to Kafka version 4 to benefit from the latest features and improvements in KRaft mode. diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 252d28c0..a4540001 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -11,15 +11,13 @@ use crate::{ KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH, role::{broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, security::KafkaTlsSecurity, - v1alpha1, }, - product_logging::STACKABLE_LOG_DIR, + product_logging::{BROKER_ID_POD_MAP_DIR, STACKABLE_LOG_DIR}, }; /// Returns the commands to start the main Kafka container pub fn broker_kafka_container_commands( - kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, + kraft_mode: bool, controller_descriptors: Vec, kafka_security: &KafkaTlsSecurity, product_version: &str, @@ -45,44 +43,49 @@ pub fn broker_kafka_container_commands( false => "".to_string(), }, import_opa_tls_cert = kafka_security.copy_opa_tls_cert_command(), - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), + broker_start_command = broker_start_command(kraft_mode, controller_descriptors, product_version), } } fn broker_start_command( - kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, + kraft_mode: bool, controller_descriptors: Vec, product_version: &str, ) -> String { - if kafka.is_controller_configured() { - formatdoc! {" - POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + let common_command = formatdoc! {" + export POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + if [ -f \"{broker_id_pod_map_dir}/$POD_NAME\" ]; then + REPLICA_ID=$(cat \"{broker_id_pod_map_dir}/$POD_NAME\") + fi + cp {config_dir}/{properties_file} /tmp/{properties_file} config-utils template /tmp/{properties_file} cp {config_dir}/jaas.properties /tmp/jaas.properties config-utils template /tmp/jaas.properties + ", + broker_id_pod_map_dir = BROKER_ID_POD_MAP_DIR, + config_dir = STACKABLE_CONFIG_DIR, + properties_file = BROKER_PROPERTIES_FILE, + }; - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + if kraft_mode { + formatdoc! {" + {common_command} + + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & ", - config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), } } else { formatdoc! {" - cp {config_dir}/{properties_file} /tmp/{properties_file} - config-utils template /tmp/{properties_file} - - cp {config_dir}/jaas.properties /tmp/jaas.properties - config-utils template /tmp/jaas.properties + {common_command} bin/kafka-server-start.sh /tmp/{properties_file} &", - config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, } } @@ -131,7 +134,6 @@ wait_for_termination() "#; pub fn controller_kafka_container_command( - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { @@ -148,7 +150,7 @@ pub fn controller_kafka_container_command( config-utils template /tmp/{properties_file} - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & wait_for_termination $! diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 1f0b8c56..41d7796c 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -162,14 +162,17 @@ impl KafkaListenerConfig { .join(",") } - /// Returns the `listener.security.protocol.map` for the Kafka `broker.properties` config. - pub fn listener_security_protocol_map_for_listener( - &self, - listener_name: &KafkaListenerName, - ) -> Option { + /// Returns the `listener.security.protocol.map` for the Kraft controller. + /// This map must include the internal broker listener too. + pub fn listener_security_protocol_map_for_controller(&self) -> String { self.listener_security_protocol_map - .get(listener_name) - .map(|protocol| format!("{listener_name}:{protocol}")) + .iter() + .filter(|(name, _)| { + *name == &KafkaListenerName::Internal || *name == &KafkaListenerName::Controller + }) + .map(|(name, protocol)| format!("{name}:{protocol}")) + .collect::>() + .join(",") } } diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index dbc6bef8..fcfc50b1 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -24,6 +24,7 @@ use stackable_operator::{ utils::cluster_info::KubernetesClusterInfo, versioned::versioned, }; +use strum::{Display, EnumIter, EnumString}; use crate::{ config::node_id_hasher::node_id_hash32_offset, @@ -59,6 +60,11 @@ pub const STACKABLE_KERBEROS_KRB5_PATH: &str = "/stackable/kerberos/krb5.conf"; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display( + "The ZooKeeper metadata manager is not supported for Kafka version 4 and higher" + ))] + Kafka4RequiresKraftMetadataManager, + #[snafu(display("The Kafka role [{role}] is missing from spec"))] MissingRole { role: String }, @@ -163,9 +169,60 @@ pub mod versioned { /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. /// Please use the 'controller' role instead. pub zookeeper_config_map_name: Option, + + /// Metadata manager to use for the Kafka cluster. + /// + /// IMPORTANT: This property will be removed as soon as Kafka 3.x support is dropped. + /// + /// Possible values are `zookeeper` and `kraft`. + /// + /// If not set, defaults to: + /// + /// - `zookeeper` for Kafka versions below `4.0.0`. + /// - `kraft` for Kafka versions `4.0.0` and higher. + /// + /// Using `zookeeper` for Kafka versions `4.0.0` and higher is not supported. + /// + /// When set to `kraft`, the operator will perform the following actions: + /// + /// * Generate the Kafka cluster id. + /// * Assign broker roles and configure controller quorum voters in the `broker.properties` files. + /// * Format storage before (re)starting Kafka brokers. + /// * Remove ZooKeeper related configuration options from the `broker.properties` files. + /// + /// Some of them cannot be performed with the ZooKeeper mode. + /// + /// This property is also useful when migrating from ZooKeeper to Kraft mode because it permits the operator + /// to reconcile controllers while still using ZooKeeper for brokers. + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_manager: Option, + + /// Enable users to manually assign Kafka broker ids. + /// + /// Name of a ConfigMap containing a mapping of broker IDs to pod names. + /// The ConfigMap must contain a key for every broker pod in the cluster with the following format: + /// `: ` + /// + /// Example: + /// ``` + /// --- + /// apiVersion: v1 + /// kind: ConfigMap + /// metadata: + /// name: brokeridmapping + /// data: + /// simple-kafka-broker-default-0: "2000" + /// simple-kafka-broker-default-1: "2001" + /// simple-kafka-broker-default-2: "2002" + /// ``` + /// This is necessary when migrating from ZooKeeper to Kraft mode to retain existing broker IDs + /// because previously broker ids were generated by Kafka and not the operator. + /// + #[serde(skip_serializing_if = "Option::is_none")] + pub broker_id_pod_config_map_name: Option, } } @@ -177,6 +234,8 @@ impl Default for v1alpha1::KafkaClusterConfig { tls: tls::default_kafka_tls(), vector_aggregator_config_map_name: None, zookeeper_config_map_name: None, + metadata_manager: None, + broker_id_pod_config_map_name: None, } } } @@ -191,30 +250,46 @@ impl HasStatusCondition for v1alpha1::KafkaCluster { } impl v1alpha1::KafkaCluster { - /// Supporting Kraft alongside Zookeeper requires a couple of CRD checks - /// - If Kafka 4 and higher is used, no zookeeper config map ref has to be provided - /// - Configuring the controller role means no zookeeper config map ref has to be provided - pub fn check_kraft_vs_zookeeper(&self, product_version: &str) -> Result<(), Error> { - if product_version.starts_with("4.") && self.spec.controllers.is_none() { - return Err(Error::Kafka4RequiresKraft); - } - - if self.spec.controllers.is_some() - && self.spec.cluster_config.zookeeper_config_map_name.is_some() - { - return Err(Error::KraftAndZookeeperConfigured); + pub fn effective_metadata_manager(&self) -> Result { + match &self.spec.cluster_config.metadata_manager { + Some(manager) => match manager.clone() { + MetadataManager::ZooKeeper => { + if self.spec.image.product_version().starts_with("4\\.") { + Err(Error::Kafka4RequiresKraftMetadataManager) + } else { + Ok(MetadataManager::ZooKeeper) + } + } + _ => Ok(MetadataManager::KRaft), + }, + None => { + if self.spec.image.product_version().starts_with("4\\.") { + Ok(MetadataManager::KRaft) + } else { + Ok(MetadataManager::ZooKeeper) + } + } } - - Ok(()) - } - - pub fn is_controller_configured(&self) -> bool { - self.spec.controllers.is_some() } - // The cluster-id for Kafka + /// The Kafka cluster id when running in Kraft mode. + /// + /// In ZooKeeper mode the cluster id is a UUID generated by Kafka its self and users typically + /// do not need to deal with it. + /// + /// When in Kraft mode, the cluster id is passed on an as the environment variable `KAFKA_CLUSTER_ID`. + /// + /// When migrating to Kraft mode, users *must* set this variable via `envOverrides` to the value + /// found in the `cluster/id` ZooKeeper node or in the `meta.properties` file. + /// + /// For freshly installed clusters, users do not need to deal with the cluster id. pub fn cluster_id(&self) -> Option<&str> { - self.metadata.name.as_deref() + self.effective_metadata_manager() + .ok() + .and_then(|manager| match manager { + MetadataManager::KRaft => self.metadata.name.as_deref(), + _ => None, + }) } /// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this @@ -412,6 +487,25 @@ pub struct KafkaClusterStatus { pub conditions: Vec, } +#[derive( + Clone, + Debug, + Deserialize, + Display, + EnumIter, + Eq, + Hash, + JsonSchema, + PartialEq, + Serialize, + EnumString, +)] +#[serde(rename_all = "lowercase")] +pub enum MetadataManager { + ZooKeeper, + KRaft, +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 00d614b9..70ac85d0 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -107,11 +107,15 @@ impl Configuration for BrokerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 5b9513a5..bf1468b6 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -97,11 +97,15 @@ impl Configuration for ControllerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..bc3f4df2 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -33,6 +33,11 @@ use crate::{ /// Env var pub const KAFKA_NODE_ID_OFFSET: &str = "NODE_ID_OFFSET"; +/// Past versions of the operator didn't set this explicitly and allowed Kafka to generate random ids. +/// To support Kraft migration, this must be carried over to `KAFKA_NODE_ID` so the operator needs +/// to know it's value for each broker Pod. +pub const KAFKA_BROKER_ID: &str = "broker.id"; + // See: https://kafka.apache.org/documentation/#brokerconfigs /// The node ID associated with the roles this process is playing when process.roles is non-empty. /// This is required configuration when running in KRaft mode. @@ -66,10 +71,6 @@ pub const KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: &str = "listener.security.protoc /// For example: localhost:9092,localhost:9093,localhost:9094. pub const KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS: &str = "controller.quorum.bootstrap.servers"; -/// Map of id/endpoint information for the set of voters in a comma-separated list of {id}@{host}:{port} entries. -/// For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094 -pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; - #[derive(Snafu, Debug)] pub enum Error { #[snafu(display("fragment validation failure"))] diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index dd5b4358..9a205b5f 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -768,6 +768,33 @@ impl KafkaTlsSecurity { KafkaListenerName::Controller.listener_ssl_truststore_type(), "PKCS12".to_string(), ); + + // The TLS properties for the internal broker listener are needed by the Kraft controllers + // too during metadata migration from ZooKeeper to Kraft mode. + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_location(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_type(), + "PKCS12".to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_location(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_type(), + "PKCS12".to_string(), + ); // We set either client tls with authentication or client tls without authentication // If authentication is explicitly required we do not want to have any other CAs to // be trusted. diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 4a4ac599..d6566b9d 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -274,11 +274,6 @@ pub async fn reconcile_kafka( .resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION) .context(ResolveProductImageSnafu)?; - // check Kraft vs ZooKeeper and fail if misconfigured - kafka - .check_kraft_vs_zookeeper(&resolved_product_image.product_version) - .context(MisconfiguredKafkaClusterSnafu)?; - let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, @@ -571,7 +566,8 @@ fn validated_product_config( ), ); - if kafka.is_controller_configured() { + // TODO: need this if because controller_role() raises an error + if kafka.spec.controllers.is_some() { roles.insert( KafkaRole::Controller.to_string(), ( diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index b7990be6..8336f5f7 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -15,6 +15,7 @@ use crate::crd::{ v1alpha1, }; +pub const BROKER_ID_POD_MAP_DIR: &str = "/stackable/broker-id-pod-map"; pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config"; pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; // log4j diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ded83c59..b423727c 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -15,13 +15,13 @@ use stackable_operator::{ use crate::{ crd::{ - JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, STACKABLE_LISTENER_BOOTSTRAP_DIR, - STACKABLE_LISTENER_BROKER_DIR, + JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, MetadataManager, + STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, role::{ - AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_BROKER_ID, + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, + KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, }, security::KafkaTlsSecurity, v1alpha1, @@ -34,6 +34,9 @@ use crate::{ #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("invalid metadata manager"))] + InvalidMetadataManager { source: crate::crd::Error }, + #[snafu(display("failed to build ConfigMap for {}", rolegroup))] BuildRoleGroupConfig { source: stackable_operator::builder::configmap::Error, @@ -44,7 +47,7 @@ pub enum Error { "failed to serialize [{JVM_SECURITY_PROPERTIES_FILE}] for {}", rolegroup ))] - JvmSecurityPoperties { + JvmSecurityProperties { source: product_config::writer::PropertiesWriterError, rolegroup: String, }, @@ -93,13 +96,21 @@ pub fn build_rolegroup_config_map( ) -> Result { let kafka_config_file_name = merged_config.config_file_name(); + let metadata_manager = kafka + .effective_metadata_manager() + .context(InvalidMetadataManagerSnafu)?; + let mut kafka_config = server_properties_file( - kafka.is_controller_configured(), + metadata_manager == MetadataManager::KRaft, &rolegroup.role, pod_descriptors, listener_config, opa_connect_string, - resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters + kafka + .spec + .cluster_config + .broker_id_pod_config_map_name + .is_some(), )?; match merged_config { @@ -163,7 +174,7 @@ pub fn build_rolegroup_config_map( .add_data( JVM_SECURITY_PROPERTIES_FILE, to_java_properties_string(jvm_sec_props.iter()).with_context(|_| { - JvmSecurityPopertiesSnafu { + JvmSecurityPropertiesSnafu { rolegroup: rolegroup.role_group.clone(), } })?, @@ -176,7 +187,7 @@ pub fn build_rolegroup_config_map( .iter() .map(|(k, v)| (k, v)), ) - .with_context(|_| JvmSecurityPopertiesSnafu { + .with_context(|_| JvmSecurityPropertiesSnafu { rolegroup: rolegroup.role_group.clone(), })?, ) @@ -213,7 +224,7 @@ fn server_properties_file( pod_descriptors: &[KafkaPodDescriptor], listener_config: &KafkaListenerConfig, opa_connect_string: Option<&str>, - needs_quorum_voters: bool, + disable_broker_id_generation: bool, ) -> Result, Error> { let kraft_controllers = kraft_controllers(pod_descriptors); @@ -250,17 +261,22 @@ fn server_properties_file( ( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config - .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) - .unwrap_or("".to_string())), + .listener_security_protocol_map_for_controller()), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + result.insert( + "inter.broker.listener.name".to_string(), + KafkaListenerName::Internal.to_string(), + ); - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + // The ZooKeeper connection is needed for migration from ZooKeeper to KRaft mode. + // It is not needed once the controller is fully running in KRaft mode. + if !kraft_mode { + result.insert( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + ); } - Ok(result) } KafkaRole::Broker => { @@ -278,6 +294,10 @@ fn server_properties_file( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config.listener_security_protocol_map(), ), + ( + "inter.broker.listener.name".to_string(), + KafkaListenerName::Internal.to_string(), + ), ]); if kraft_mode { @@ -285,6 +305,10 @@ fn server_properties_file( // Running in KRaft mode result.extend([ + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), (KAFKA_NODE_ID.to_string(), "${env:REPLICA_ID}".to_string()), ( KAFKA_PROCESS_ROLES.to_string(), @@ -299,19 +323,25 @@ fn server_properties_file( kraft_controllers.clone(), ), ]); - - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } } else { // Running with ZooKeeper enabled result.extend([( "zookeeper.connect".to_string(), "${env:ZOOKEEPER}".to_string(), )]); + // We are in zookeeper mode and the user has defined a broker id mapping + // so we disable automatic id generation. + // This check ensures that existing clusters running in ZooKeeper mode do not + // suddenly break after the introduction of this change. + if disable_broker_id_generation { + result.extend([ + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), + (KAFKA_BROKER_ID.to_string(), "${env:REPLICA_ID}".to_string()), + ]); + } } // Enable OPA authorization @@ -358,21 +388,6 @@ fn kraft_controllers(pod_descriptors: &[KafkaPodDescriptor]) -> Option { } } -fn kraft_voters(pod_descriptors: &[KafkaPodDescriptor]) -> Option { - let result = pod_descriptors - .iter() - .filter(|pd| pd.role == KafkaRole::Controller.to_string()) - .map(|desc| desc.as_quorum_voter()) - .collect::>() - .join(","); - - if result.is_empty() { - None - } else { - Some(result) - } -} - // Generate JAAS configuration file for Kerberos authentication // or an empty string if Kerberos is not enabled. // See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 10ae0366..5b9acaef 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -51,8 +51,8 @@ use crate::{ crd::{ self, APP_NAME, KAFKA_HEAP_OPTS, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, - STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LISTENER_BOOTSTRAP_DIR, - STACKABLE_LISTENER_BROKER_DIR, + MetadataManager, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, + STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, role::{ AnyConfig, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BrokerContainer, controller::ControllerContainer, @@ -64,14 +64,17 @@ use crate::{ kerberos::add_kerberos_pod_config, operations::graceful_shutdown::add_graceful_shutdown_config, product_logging::{ - MAX_KAFKA_LOG_FILES_SIZE, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, kafka_log_opts, - kafka_log_opts_env_var, + BROKER_ID_POD_MAP_DIR, MAX_KAFKA_LOG_FILES_SIZE, STACKABLE_LOG_CONFIG_DIR, + STACKABLE_LOG_DIR, kafka_log_opts, kafka_log_opts_env_var, }, utils::build_recommended_labels, }; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("invalid metadata manager"))] + InvalidMetadataManager { source: crate::crd::Error }, + #[snafu(display("failed to add kerberos config"))] AddKerberosConfig { source: crate::kerberos::Error }, @@ -284,7 +287,9 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; + let metadata_manager = kafka + .effective_metadata_manager() + .context(InvalidMetadataManagerSnafu)?; cb_kafka .image_from_product_image(resolved_product_image) @@ -296,8 +301,7 @@ pub fn build_broker_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![broker_kafka_container_commands( - kafka, - cluster_id, + metadata_manager == MetadataManager::KRaft, // we need controller pods kafka .pod_descriptors( @@ -438,6 +442,22 @@ pub fn build_broker_rolegroup_statefulset( ) .context(AddListenerVolumeSnafu)?; } + + if let Some(broker_id_config_map_name) = + &kafka.spec.cluster_config.broker_id_pod_config_map_name + { + pod_builder + .add_volume( + VolumeBuilder::new("broker-id-pod-map-dir") + .with_config_map(broker_id_config_map_name) + .build(), + ) + .context(AddVolumeSnafu)?; + cb_kafka + .add_volume_mount("broker-id-pod-map-dir", BROKER_ID_POD_MAP_DIR) + .context(AddVolumeMountSnafu)?; + } + pod_builder .metadata(metadata) .image_pull_secrets_from_product_image(resolved_product_image) @@ -636,6 +656,22 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); + // Controllers need the ZooKeeper connection string for migration + if let Some(zookeeper_config_map_name) = &kafka.spec.cluster_config.zookeeper_config_map_name { + env.push(EnvVar { + name: "ZOOKEEPER".to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: zookeeper_config_map_name.to_string(), + key: "ZOOKEEPER".to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }) + }; + cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ @@ -646,7 +682,6 @@ pub fn build_controller_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![controller_kafka_container_command( - kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?, diff --git a/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 index fd95c8ef..704cacaa 100644 --- a/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 index 2be3f573..563e72a5 100644 --- a/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 index b11dd670..cafaf9ba 100644 --- a/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} brokers: diff --git a/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 index 13a19572..a6ad4ec2 100644 --- a/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 @@ -15,8 +15,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 b/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 index c1b64e7e..3fdc5c4d 100644 --- a/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 @@ -16,8 +16,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 b/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 index ba70a1b5..a077213b 100644 --- a/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 @@ -16,8 +16,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 b/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 index d532d273..d788a9c9 100644 --- a/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 @@ -24,6 +24,7 @@ spec: {% endif %} {% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 index 282686e9..95d85da6 100644 --- a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 @@ -69,6 +69,7 @@ spec: {% endif %} pullPolicy: IfNotPresent clusterConfig: + metadataManager: kraft authentication: - authenticationClass: test-kafka-client-auth-tls tls: diff --git a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 index e88820f0..93e1d415 100644 --- a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 @@ -32,6 +32,9 @@ spec: productVersion: "{{ test_scenario['values']['upgrade_old'] }}" pullPolicy: IfNotPresent clusterConfig: + # Need to set this explicitly because the default would be zookeeper for 3.9.1 + # but we don't want to test zookeeper -> kraft migration here + metadataManager: kraft {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} authentication: - authenticationClass: test-kafka-client-auth-tls