From 519ea81e91fa894685e8a738b9a432fa4e45cfb6 Mon Sep 17 00:00:00 2001 From: arvindksi274-ksolves Date: Mon, 22 Dec 2025 13:07:25 +0530 Subject: [PATCH] CASSANDRA-20081: Add compaction type and strategy properties to compaction history --- .../apache/cassandra/db/SystemKeyspace.java | 9 ++++-- .../db/SystemKeyspaceMigrator41.java | 6 ++-- .../CompactionHistoryTabularData.java | 21 +++++++++----- .../db/compaction/CompactionTask.java | 15 ++++++---- .../stats/CompactionHistoryHolder.java | 8 +++-- .../stats/CompactionHistoryPrinter.java | 3 +- ...mpactionHistorySystemTableUpgradeTest.java | 5 ++-- .../db/SystemKeyspaceMigrator41Test.java | 9 ++++-- .../cassandra/db/SystemKeyspaceTest.java | 29 +++++++++++++++++++ .../db/compaction/CompactionTaskTest.java | 10 ++++++- .../tools/nodetool/CompactionHistoryTest.java | 20 ++++++++----- 11 files changed, 99 insertions(+), 36 deletions(-) diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index a15b46665f44..8dc5776692fd 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -373,6 +373,7 @@ private SystemKeyspace() + "keyspace_name text," + "rows_merged map," + "compaction_properties frozen>," + + "compaction_type text," + "PRIMARY KEY ((id)))") .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) .build(); @@ -718,12 +719,13 @@ public static void updateCompactionHistory(TimeUUID taskId, long bytesIn, long bytesOut, Map rowsMerged, - Map compactionProperties) + Map compactionProperties, + String compactionType) { // don't write anything when the history table itself is compacted, since that would in turn cause new compactions if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) return; - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties, compaction_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; executeInternal(format(req, COMPACTION_HISTORY), taskId, ksname, @@ -732,7 +734,8 @@ public static void updateCompactionHistory(TimeUUID taskId, bytesIn, bytesOut, rowsMerged, - compactionProperties); + compactionProperties, + compactionType); } public static TabularData getCompactionHistory() throws OpenDataException diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java index ab9f01f94500..aa4b69592098 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator41.java @@ -176,7 +176,8 @@ static void migrateCompactionHistory() "compacted_at", "keyspace_name", "rows_merged", - "compaction_properties" }, + "compaction_properties", + "compaction_type" }, row -> Collections.singletonList(new Object[]{ row.getTimeUUID("id") , row.has("bytes_in") ? row.getLong("bytes_in") : null, row.has("bytes_out") ? row.getLong("bytes_out") : null, @@ -184,7 +185,8 @@ static void migrateCompactionHistory() row.has("compacted_at") ? row.getTimestamp("compacted_at") : null, row.has("keyspace_name") ? row.getString("keyspace_name") : null, row.has("rows_merged") ? row.getMap("rows_merged", Int32Type.instance, LongType.instance) : null, - row.has("compaction_properties") ? row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance) : ImmutableMap.of() }) + row.has("compaction_properties") ? row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance) : ImmutableMap.of(), + row.has("compaction_type") ? row.getString("compaction_type") : null }) ); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java index 182f34896a04..0a667edad02d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java @@ -30,11 +30,13 @@ public class CompactionHistoryTabularData { private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at", - "bytes_in", "bytes_out", "rows_merged", "compaction_properties" }; + "bytes_in", "bytes_out", "rows_merged", "compaction_properties", + "compaction_type" }; private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name", "column family name", "compaction finished at", - "total bytes in", "total bytes out", "total rows merged", "compaction properties" }; + "total bytes in", "total bytes out", "total rows merged", + "compaction properties", "compaction type" }; private static final String TYPE_NAME = "CompactionHistory"; @@ -45,15 +47,14 @@ public class CompactionHistoryTabularData private static final CompositeType COMPOSITE_TYPE; private static final TabularType TABULAR_TYPE; - - public static final String COMPACTION_TYPE_PROPERTY = "compaction_type"; - - static + + static { try { ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, - SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING }; + SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING, + SimpleType.STRING }; COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); @@ -78,10 +79,14 @@ public static TabularData from(UntypedResultSet resultSet) throws OpenDataExcept long bytesOut = row.getLong(ITEM_NAMES[5]); Map rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance); Map compactionProperties = row.getMap(ITEM_NAMES[7], UTF8Type.instance, UTF8Type.instance); + + String compactionType = row.has(ITEM_NAMES[8]) ? row.getString(ITEM_NAMES[8]) : "UNKNOWN"; + result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut, '{' + FBUtilities.toString(rowMerged) + '}', - '{' + FBUtilities.toString(compactionProperties) + '}' })); + '{' + FBUtilities.toString(compactionProperties) + '}', + compactionType })); } return result; } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c9af97fe95bf..84f29179f783 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -28,7 +28,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; @@ -64,7 +63,6 @@ import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.Refs; -import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; @@ -314,8 +312,15 @@ public boolean apply(SSTableReader sstable) for (int i = 0; i < mergedRowCounts.length; i++) totalSourceRows += mergedRowCounts[i] * (i + 1); + Map props = new HashMap<>(); + props.put("strategy", cfs.getCompactionStrategyManager().getCompactionParams().klass().getSimpleName()); + + if (getLevel() > 0) + props.put("level", Integer.toString(getLevel())); + String mergeSummary = updateCompactionHistory(taskId, cfs.getKeyspaceName(), cfs.getTableName(), mergedRowCounts, startsize, endsize, - ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type)); + props, + compactionType.type); logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}. Time spent writing keys = %,dms", transaction.opIdString(), @@ -353,7 +358,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel()); } - public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map compactionProperties) + public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map compactionProperties, String compactionType) { StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10); Map mergedRows = new HashMap<>(); @@ -367,7 +372,7 @@ public static String updateCompactionHistory(TimeUUID taskId, String keyspaceNam mergeSummary.append(String.format("%d:%d, ", rows, count)); mergedRows.put(rows, count); } - SystemKeyspace.updateCompactionHistory(taskId, keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties); + SystemKeyspace.updateCompactionHistory(taskId, keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties, compactionType); return mergeSummary.toString(); } diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java index 189500bc9e76..a1610aff2e71 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryHolder.java @@ -58,8 +58,9 @@ private static class CompactionHistoryRow implements Comparable getAllAsMap(boolean humanReadable) compaction.put("bytes_out", FileUtils.stringifyFileSize(this.bytesOut, humanReadable)); compaction.put("rows_merged", this.rowMerged); compaction.put("compaction_properties", this.compactionProperties); + compaction.put("compaction_type", this.compactionType); return compaction; } } @@ -117,7 +120,8 @@ public Map convert2Map() (Long)value.get(4), (Long)value.get(5), (String)value.get(6), - (String)value.get(7) + (String)value.get(7), + value.size() > 8 ? (String)value.get(8) : "UNKNOWN" ); chrList.add(chr); } diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java index c2b62ee030d5..d4384d1aa61f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/CompactionHistoryPrinter.java @@ -65,7 +65,7 @@ public void print(CompactionHistoryHolder data, PrintStream out) for (Object chr : compactionHistories) { Map value = chr instanceof Map ? (Map)chr : Collections.emptyMap(); - String[] obj = new String[8]; + String[] obj = new String[9]; obj[0] = (String)value.get("id"); obj[1] = (String)value.get("keyspace_name"); obj[2] = (String)value.get("columnfamily_name"); @@ -74,6 +74,7 @@ public void print(CompactionHistoryHolder data, PrintStream out) obj[5] = value.get("bytes_out").toString(); obj[6] = (String)value.get("rows_merged"); obj[7] = (String)value.get("compaction_properties"); + obj[8] = (String)value.get("compaction_type"); table.add(obj); } table.printTo(out); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java index bd62bf7cf29b..2368879c90e9 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactionHistorySystemTableUpgradeTest.java @@ -26,7 +26,6 @@ import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.tools.ToolRunner; -import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; import static org.apache.cassandra.tools.ToolRunner.invokeNodetoolJvmDtest; import static org.apache.cassandra.tools.nodetool.CompactionHistoryTest.assertCompactionHistoryOutPut; @@ -66,13 +65,13 @@ public void compactionHistorySystemTableTest() throws Throwable ToolRunner.ToolResult toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory"); toolHistory.assertOnCleanExit(); // upgraded system.compaction_history data verify - assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of()); + assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of(), "Compaction"); // force compact cluster.stream().forEach(node -> node.nodetool("compact")); toolHistory = invokeNodetoolJvmDtest(cluster.get(1), "compactionhistory"); toolHistory.assertOnCleanExit(); - assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of(COMPACTION_TYPE_PROPERTY, OperationType.MAJOR_COMPACTION.type)); + assertCompactionHistoryOutPut(toolHistory, KEYSPACE, "tb", ImmutableMap.of(), OperationType.MAJOR_COMPACTION.type); }) .run(); } diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java index f9a09608a42c..c0c809487f03 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator41Test.java @@ -283,8 +283,9 @@ public void testMigrateCompactionHistory() throws Throwable + "columnfamily_name, " + "compacted_at, " + "keyspace_name, " - + "rows_merged) " - + " values ( ?, ?, ?, ?, ?, ?, ? )", + + "rows_merged, " + + "compaction_type) " + + " values ( ?, ?, ?, ?, ?, ?, ?, ? )", table); TimeUUID compactionId = TimeUUID.Generator.atUnixMillis(currentTimeMillis()); Date compactAt = Date.from(now()); @@ -296,7 +297,8 @@ public void testMigrateCompactionHistory() throws Throwable "table", compactAt, "keyspace", - rowsMerged); + rowsMerged, + "Major"); SystemKeyspaceMigrator41.migrateCompactionHistory(); int rowCount = 0; @@ -311,6 +313,7 @@ public void testMigrateCompactionHistory() throws Throwable assertEquals("keyspace", row.getString("keyspace_name")); assertEquals(rowsMerged, row.getMap("rows_merged", Int32Type.instance, LongType.instance)); assertEquals(ImmutableMap.of(), row.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance)); + assertEquals("Major", row.getString("compaction_type")); } assertEquals(1, rowCount); Keyspace.all().forEach(ks -> { diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java index 7eb8f9c578db..dd3cb0c179aa 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java @@ -40,6 +40,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; import static java.lang.String.format; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; @@ -158,6 +159,34 @@ public void testPersistLocalMetadata() assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("listen_port")); } + @Test + public void testCompactionHistory() + { + String ks = "test_ks"; + String cf = "test_cf"; + long now = System.currentTimeMillis(); + Map rowsMerged = Collections.singletonMap(1, 100L); + Map props = Collections.singletonMap("strategy", "STCS"); + String compactionType = "TestMajor"; + + SystemKeyspace.updateCompactionHistory( + TimeUUID.Generator.nextTimeUUID(), + ks, + cf, + now, + 1000, + 500, + rowsMerged, + props, + compactionType + ); + + UntypedResultSet result = executeInternal("SELECT compaction_type FROM system.compaction_history WHERE keyspace_name=? AND columnfamily_name=? ALLOW FILTERING", ks, cf); + + assertNotNull(result); + assertEquals(compactionType, result.one().getString("compaction_type")); + } + private String getOlderVersionString() { String version = FBUtilities.getReleaseVersionString(); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java index 8109e40dfe75..f7c07c1f5954 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import org.junit.Assert; @@ -37,6 +38,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; @@ -92,7 +94,7 @@ public void testTaskIdIsPersistedInCompactionHistory() task.execute(CompactionManager.instance.active); } - UntypedResultSet rows = QueryProcessor.executeInternal(format("SELECT id FROM system.%s where id = %s", + UntypedResultSet rows = QueryProcessor.executeInternal(format("SELECT id, compaction_type, compaction_properties FROM system.%s where id = %s", SystemKeyspace.COMPACTION_HISTORY, id.toString())); @@ -103,6 +105,12 @@ public void testTaskIdIsPersistedInCompactionHistory() TimeUUID persistedId = one.getTimeUUID("id"); Assert.assertEquals(id, persistedId); + + String type = one.getString("compaction_type"); + Assert.assertEquals("Compaction", type); + + Map properties = one.getMap("compaction_properties", UTF8Type.instance, UTF8Type.instance); + Assert.assertTrue("Strategy missing in properties", properties.containsKey("strategy")); } @Test diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java index c2ee30f9bcc7..5a12b3fb6303 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionHistoryTest.java @@ -42,7 +42,6 @@ import org.apache.cassandra.tools.ToolRunner.ToolResult; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.db.compaction.CompactionHistoryTabularData.COMPACTION_TYPE_PROPERTY; import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; @@ -101,35 +100,40 @@ public void testCompactionProperties() throws Throwable ImmutableList.Builder builder = ImmutableList.builder(); List cmds = builder.addAll(cmd).add(keyspace()).add(currentTable()).build(); - compactionHistoryResultVerify(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType), cmds); - String cql = "select keyspace_name,columnfamily_name,compaction_properties from system." + SystemKeyspace.COMPACTION_HISTORY + + Map expectedProperties = ImmutableMap.of("strategy", "SizeTieredCompactionStrategy"); + + compactionHistoryResultVerify(keyspace(), currentTable(), expectedProperties, compactionType, cmds); + + String cql = "select keyspace_name,columnfamily_name,compaction_properties,compaction_type from system." + SystemKeyspace.COMPACTION_HISTORY + " where keyspace_name = '" + keyspace() + "' AND columnfamily_name = '" + currentTable() + "' ALLOW FILTERING"; + Object[][] objects = new Object[systemTableRecord][]; for (int i = 0; i != systemTableRecord; ++i) { - objects[i] = row(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType)); + objects[i] = row(keyspace(), currentTable(), expectedProperties, compactionType); } assertRows(execute(cql), objects); } - private void compactionHistoryResultVerify(String keyspace, String table, Map properties, List cmds) + private void compactionHistoryResultVerify(String keyspace, String table, Map properties, String compType, List cmds) { ToolResult toolCompact = invokeNodetool(cmds); toolCompact.assertOnCleanExit(); ToolResult toolHistory = invokeNodetool("compactionhistory"); toolHistory.assertOnCleanExit(); - assertCompactionHistoryOutPut(toolHistory, keyspace, table, properties); + assertCompactionHistoryOutPut(toolHistory, keyspace, table, properties, compType); } - public static void assertCompactionHistoryOutPut(ToolResult toolHistory, String keyspace, String table, Map properties) + public static void assertCompactionHistoryOutPut(ToolResult toolHistory, String keyspace, String table, Map properties, String compType) { String stdout = toolHistory.getStdout(); String[] resultArray = stdout.split(System.lineSeparator()); assertTrue(Arrays.stream(resultArray) .anyMatch(result -> result.contains('{' + FBUtilities.toString(properties) + '}') && result.contains(keyspace) - && result.contains(table))); + && result.contains(table) + && result.contains(compType))); } }