Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import java.sql.SQLType;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class DatabaseMetaDataImpl implements java.sql.DatabaseMetaData, JdbcV2Wrapper {
private static final Logger log = LoggerFactory.getLogger(DatabaseMetaDataImpl.class);
Expand All @@ -38,12 +43,13 @@ public class DatabaseMetaDataImpl implements java.sql.DatabaseMetaData, JdbcV2Wr
public enum TableType {
DICTIONARY("DICTIONARY"),
LOG_TABLE("LOG TABLE"),
MATERIALIZED_VIEW("MATERIALIZED VIEW"),
MEMORY_TABLE("MEMORY TABLE"),
REMOTE_TABLE("REMOTE TABLE"),
TABLE("TABLE"),
VIEW("VIEW"),
SYSTEM_TABLE("SYSTEM TABLE"),
TEMPORARY_TABLE("TEMPORARY TABLE");
TABLE("TABLE"),
TEMPORARY_TABLE("TEMPORARY TABLE"),
VIEW("VIEW");

private final String typeName;

Expand All @@ -55,8 +61,9 @@ public String getTypeName() {
return typeName;
}
}
public static final String[] TABLE_TYPES = new String[] { "DICTIONARY", "LOG TABLE", "MEMORY TABLE",
"REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE" };

static final Set<String> TABLE_TYPES = Arrays.stream(TableType.values()).map(TableType::getTypeName).collect(Collectors.toSet());
private static final String TEMPORARY_ENGINE_PREFIX = "Temporary";

private static final String DATABASE_PRODUCT_NAME = "ClickHouse";
private static final String DRIVER_NAME = DATABASE_PRODUCT_NAME + " JDBC Driver";
Expand Down Expand Up @@ -764,6 +771,151 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin
}
}

// Map of ClickHouse engine names to JDBC table types
static final Map<String, String> ENGINE_TO_TABLE_TYPE;
static {
Map<String, String> map = new java.util.HashMap<>();

// Log tables
map.put("Log", TableType.LOG_TABLE.getTypeName());
map.put("StripeLog", TableType.LOG_TABLE.getTypeName());
map.put("TinyLog", TableType.LOG_TABLE.getTypeName());

// Memory tables
map.put("Buffer", TableType.MEMORY_TABLE.getTypeName());
map.put("Memory", TableType.MEMORY_TABLE.getTypeName());
map.put("Set", TableType.MEMORY_TABLE.getTypeName());

// Views
map.put("View", TableType.VIEW.getTypeName());
map.put("LiveView", TableType.VIEW.getTypeName());
map.put("MaterializedView", TableType.MATERIALIZED_VIEW.getTypeName());
map.put("WindowView", TableType.VIEW.getTypeName());

// Dictionary
map.put("Dictionary", TableType.DICTIONARY.getTypeName());

// Remote/External tables
map.put("AzureBlobStorage", TableType.REMOTE_TABLE.getTypeName());
map.put("AzureQueue", TableType.REMOTE_TABLE.getTypeName());
map.put("COSN", TableType.REMOTE_TABLE.getTypeName());
map.put("ArrowFlight", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLake", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeAzure", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeLocal", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeS3", TableType.REMOTE_TABLE.getTypeName());
map.put("Distributed", TableType.REMOTE_TABLE.getTypeName());
map.put("FuzzJSON", TableType.REMOTE_TABLE.getTypeName());
map.put("FuzzQuery", TableType.REMOTE_TABLE.getTypeName());
map.put("GCS", TableType.REMOTE_TABLE.getTypeName());
map.put("GenerateRandom", TableType.REMOTE_TABLE.getTypeName());
map.put("HDFS", TableType.REMOTE_TABLE.getTypeName());
map.put("Hive", TableType.REMOTE_TABLE.getTypeName());
map.put("Hudi", TableType.REMOTE_TABLE.getTypeName());
map.put("Iceberg", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergAzure", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergHDFS", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergLocal", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergS3", TableType.REMOTE_TABLE.getTypeName());
map.put("JDBC", TableType.REMOTE_TABLE.getTypeName());
map.put("Kafka", TableType.REMOTE_TABLE.getTypeName());
map.put("MaterializedPostgreSQL", TableType.REMOTE_TABLE.getTypeName());
map.put("MongoDB", TableType.REMOTE_TABLE.getTypeName());
map.put("MySQL", TableType.REMOTE_TABLE.getTypeName());
map.put("NATS", TableType.REMOTE_TABLE.getTypeName());
map.put("ODBC", TableType.REMOTE_TABLE.getTypeName());
map.put("OSS", TableType.REMOTE_TABLE.getTypeName());
map.put("PostgreSQL", TableType.REMOTE_TABLE.getTypeName());
map.put("RabbitMQ", TableType.REMOTE_TABLE.getTypeName());
map.put("Redis", TableType.REMOTE_TABLE.getTypeName());
map.put("S3", TableType.REMOTE_TABLE.getTypeName());
map.put("S3Queue", TableType.REMOTE_TABLE.getTypeName());
map.put("URL", TableType.REMOTE_TABLE.getTypeName());
map.put("YTsaurus", TableType.REMOTE_TABLE.getTypeName());

// Regular tables (MergeTree family and others)
map.put("AggregatingMergeTree", TableType.TABLE.getTypeName());
map.put("Alias", TableType.TABLE.getTypeName());
map.put("CoalescingMergeTree", TableType.TABLE.getTypeName());
map.put("CollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("EmbeddedRocksDB", TableType.TABLE.getTypeName());
map.put("Executable", TableType.TABLE.getTypeName());
map.put("ExecutablePool", TableType.TABLE.getTypeName());
map.put("GraphiteMergeTree", TableType.TABLE.getTypeName());
map.put("Join", TableType.TABLE.getTypeName());
map.put("KeeperMap", TableType.TABLE.getTypeName());
map.put("Merge", TableType.TABLE.getTypeName());
map.put("MergeTree", TableType.TABLE.getTypeName());
map.put("ReplacingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedAggregatingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedCoalescingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedCollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedGraphiteMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedReplacingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedSummingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedVersionedCollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("SummingMergeTree", TableType.TABLE.getTypeName());
map.put("VersionedCollapsingMergeTree", TableType.TABLE.getTypeName());

// Special
map.put("TimeSeries", TableType.TABLE.getTypeName());
map.put("Null", TableType.TABLE.getTypeName());
map.put("Loop", TableType.TABLE.getTypeName());
map.put("SQLite", TableType.TABLE.getTypeName());
map.put("File", TableType.TABLE.getTypeName());
map.put("FileLog", TableType.TABLE.getTypeName());

ENGINE_TO_TABLE_TYPE = Collections.unmodifiableMap(map);
}

/**
* Converts engine name to table type. Returns TABLE as default for unknown engines.
*/
private static String engineToTableType(String engine) {
if (engine == null) {
return TableType.TABLE.getTypeName();
}
// Check for system tables (engines starting with "System" or "Async")
if (engine.startsWith("System") || engine.startsWith("Async")) {
return TableType.SYSTEM_TABLE.getTypeName();
}
return ENGINE_TO_TABLE_TYPE.getOrDefault(engine, TableType.TABLE.getTypeName());
}

/**
* Returns set of engines that map to any of the given table types.
*/
private static Set<String> getEnginesForTableTypes(Set<String> requestedTypes) {
Set<String> engines = new HashSet<>();

for (Map.Entry<String, String> entry : ENGINE_TO_TABLE_TYPE.entrySet()) {
if (requestedTypes.contains(entry.getValue())) {
engines.add(entry.getKey());
}
}

return engines;
}

private static final Consumer<Map<String, Object>> TABLE_TYPE_MUTATOR = row -> {
String engine = (String) row.get("TABLE_TYPE");

String tableType;
if (engine != null && engine.startsWith(TEMPORARY_ENGINE_PREFIX)) {
tableType = TableType.TEMPORARY_TABLE.getTypeName();
} else if (engine != null && (engine.startsWith("System") || engine.startsWith("Async"))) {
tableType = TableType.SYSTEM_TABLE.getTypeName();
} else {
tableType = engineToTableType(engine);
}

row.put("TABLE_TYPE", tableType);
};

private static final Collection<Consumer<Map<String, Object>>> GET_TABLES_MUTATORS = Collections.singletonList(TABLE_TYPE_MUTATOR);


/**
* Returns tables defined for a schema. Parameter {@code catalog} is ignored
*
Expand All @@ -774,24 +926,47 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException {
log.debug("getTables: catalog={}, schemaPattern={}, tableNamePattern={}, types={}", catalog, schemaPattern, tableNamePattern, types);
// TODO: when switch between catalog and schema is implemented, then TABLE_SCHEMA and TABLE_CAT should be populated accordingly
// String commentColumn = connection.getServerVersion().check("[21.6,)") ? "t.comment" : "''";
// TODO: handle useCatalogs == true and return schema catalog name
if (types == null || types.length == 0) {
types = TABLE_TYPES;

// Get engines that map to the requested table types
Set<String> requestedTypes = (types == null || types.length == 0) ? TABLE_TYPES : Arrays.stream(types).collect(Collectors.toSet()) ;
Set<String> engines = getEnginesForTableTypes(requestedTypes);

// Build engine filter conditions
List<String> filterConditions = new ArrayList<>();

// Add condition for engines that map to requested types
if (!engines.isEmpty()) {
filterConditions.add("(t.engine IN ('" + String.join("','", engines) + "'))");
}

// If TABLE type is requested, also include engines not in our map (they default to TABLE)
if (requestedTypes.contains(TableType.TABLE.getTypeName())) {
filterConditions.add("(t.engine NOT IN ('" + String.join("','", ENGINE_TO_TABLE_TYPE.keySet()) +
"') AND NOT t.engine LIKE 'System%' AND NOT t.engine LIKE 'Async%' AND t.is_temporary = 0)");
}

// If SYSTEM TABLE is requested, include system engines (System* and Async*)
if (requestedTypes.contains(TableType.SYSTEM_TABLE.getTypeName())) {
filterConditions.add("(t.engine LIKE 'System%' OR t.engine LIKE 'Async%')");
}

// If TEMPORARY TABLE is requested, include temporary tables
if (requestedTypes.contains(TableType.TEMPORARY_TABLE.getTypeName())) {
filterConditions.add("(t.is_temporary = 1)");
}

String engineFilter = filterConditions.isEmpty() ? "" : " AND ( " + String.join(" OR ", filterConditions) + ")";
// Exclude temporary tables when not requested (they would otherwise match engine-based conditions)
if (!requestedTypes.contains(TableType.TEMPORARY_TABLE.getTypeName())) {
engineFilter += " AND (t.is_temporary = 0)";
}

String sql = "SELECT " +
catalogPlaceholder + " AS TABLE_CAT, " +
"t.database AS TABLE_SCHEM, " +
"t.name AS TABLE_NAME, " +
"CASE WHEN t.engine LIKE '%Log' THEN 'LOG TABLE' " +
"WHEN t.engine in ('Buffer', 'Memory', 'Set') THEN 'MEMORY TABLE' " +
"WHEN t.is_temporary != 0 THEN 'TEMPORARY TABLE' " +
"WHEN t.engine like '%View' THEN 'VIEW'" +
"WHEN t.engine = 'Dictionary' THEN 'DICTIONARY' " +
"WHEN t.engine LIKE 'Async%' OR t.engine LIKE 'System%' THEN 'SYSTEM TABLE' " +
"WHEN empty(t.data_paths) THEN 'REMOTE TABLE' " +
"ELSE 'TABLE' END AS TABLE_TYPE, " +
"if(t.is_temporary = 1, concat('Temporary', t.engine), t.engine) AS TABLE_TYPE, " +
"t.comment AS REMARKS, " +
"CAST(null as Nullable(String)) AS TYPE_CAT, " + // no types catalog
"d.engine AS TYPE_SCHEM, " + // no types schema
Expand All @@ -802,10 +977,10 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
" JOIN system.databases d ON system.tables.database = system.databases.name" +
" WHERE t.database LIKE '" + (schemaPattern == null ? "%" : schemaPattern) + "'" +
" AND t.name LIKE '" + (tableNamePattern == null ? "%" : tableNamePattern) + "'" +
" AND TABLE_TYPE IN ('" + String.join("','", types) + "')";
engineFilter;

try {
return connection.createStatement().executeQuery(sql);
try (Statement statement = connection.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
return DetachedResultSet.createFromResultSet(rs, connection.getDefaultCalendar(), GET_TABLES_MUTATORS);
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand Down Expand Up @@ -844,6 +1019,8 @@ public ResultSet getCatalogs() throws SQLException {
}
}


static final String TABLE_TYPES_SQL_ARRAY = Arrays.stream(TableType.values()).map(TableType::getTypeName).collect(Collectors.joining("','"));
/**
* Returns name of the ClickHouse table types as the broad category (rather than engine name).
* @return - ResultSet with one column TABLE_TYPE
Expand All @@ -852,7 +1029,7 @@ public ResultSet getCatalogs() throws SQLException {
@Override
public ResultSet getTableTypes() throws SQLException {
try {
return connection.createStatement().executeQuery("SELECT arrayJoin(['" + String.join("','", TABLE_TYPES) + "']) AS TABLE_TYPE ORDER BY TABLE_TYPE");
return connection.createStatement().executeQuery("SELECT arrayJoin(['" + TABLE_TYPES_SQL_ARRAY + "']) AS TABLE_TYPE ORDER BY TABLE_TYPE");
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand Down
Loading
Loading