From 60c323fceeec720b664a4fac1c9f740b541a8e95 Mon Sep 17 00:00:00 2001 From: Yuqi Yan Date: Thu, 20 Nov 2025 14:40:58 -0800 Subject: [PATCH] Option to schedule Paxos cleanup on topology change by keyspace patch by Yuqi Yan; reviewed by for CASSANDRA-20801 --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 1 + .../org/apache/cassandra/service/StorageService.java | 10 ++++++++-- .../cluster/OnInstanceTopologyChangePaxosRepair.java | 12 +++++++++++- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 06b33e984657..5d6aae0d3896 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Option to schedule Paxos cleanup on topology change by keyspace (CASSANDRA-20801) * Introducing comments and security labels for schema elements (CASSANDRA-20943) * Extend nodetool tablestats for dictionary memory usage (CASSANDRA-20940) * Introduce separate GCInspector thresholds for concurrent GC events (CASSANDRA-20980) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 2bcf423c7ef3..77b1a5f90df1 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -434,6 +434,7 @@ public enum CassandraRelevantProperties PAXOS_LOG_TTL_LINEARIZABILITY_VIOLATIONS("cassandra.paxos.log_ttl_linearizability_violations", "true"), PAXOS_MODERN_RELEASE("cassandra.paxos.modern_release", "4.1"), PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE("cassandra.paxos_repair_allow_multiple_pending_unsafe"), + PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE("cassandra.paxos_repair_on_topology_change_by_keyspace", "false"), PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRIES("cassandra.paxos_repair_on_topology_change_retries", "10"), PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRY_DELAY_SECONDS("cassandra.paxos_repair_on_topology_change_retry_delay_seconds", "10"), PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS("cassandra.paxos_repair_retry_timeout_millis", "60000"), diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 84a61f6b2646..c083ceeddadd 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -248,6 +248,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.CONSISTENT_RANGE_MOVEMENT; import static org.apache.cassandra.config.CassandraRelevantProperties.DRAIN_EXECUTOR_TIMEOUT_MS; import static org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING; +import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE; import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRIES; import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRY_DELAY_SECONDS; import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_ADDRESS_FIRST_BOOT; @@ -3226,9 +3227,10 @@ public void repairPaxosForTopologyChange(String reason) } @VisibleForTesting - public Future startRepairPaxosForTopologyChange(String reason) + public Future startRepairPaxosForTopologyChange(String reason) throws ExecutionException, InterruptedException { logger.info("repairing paxos for {}", reason); + boolean scheduleByKeyspace = PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE.getBoolean(); List> futures = new ArrayList<>(); @@ -3242,7 +3244,11 @@ public Future startRepairPaxosForTopologyChange(String reason) continue; Collection> ranges = getLocalAndPendingRanges(ksName); - futures.add(ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason)); + if (scheduleByKeyspace) + // blocking wait here if scheduling by keyspace to avoid overwhelming the messages with many repairs at once + ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason).get(); + else + futures.add(ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason)); } return FutureCombiner.allOf(futures); diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceTopologyChangePaxosRepair.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceTopologyChangePaxosRepair.java index daaa4744d216..754ef0ec0b3e 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceTopologyChangePaxosRepair.java +++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceTopologyChangePaxosRepair.java @@ -18,6 +18,8 @@ package org.apache.cassandra.simulator.cluster; +import java.util.concurrent.ExecutionException; + import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.Condition; @@ -63,7 +65,15 @@ protected static SerializableRunnable invokableTopologyChangeRepair(String reaso { return () -> { Condition condition = newOneTimeCondition(); - Future future = StorageService.instance.startRepairPaxosForTopologyChange(reason); + Future future; + try + { + future = StorageService.instance.startRepairPaxosForTopologyChange(reason); + } + catch (ExecutionException | InterruptedException e) + { + throw new RuntimeException(e); + } future.addListener(condition::signal); // add listener so we don't use Futures.addAllAsList condition.awaitThrowUncheckedOnInterrupt(); };