Skip to content

Conversation

@harshach
Copy link
Collaborator

Distributed Search Indexing - Design Document

Overview

This document describes the distributed search indexing capability in OpenMetadata, enabling multiple server instances to collaboratively process large-scale indexing jobs. This is essential for deployments with millions of entities where single-server indexing becomes a bottleneck.

Solution Architecture

High-Level Design

flowchart TB
    subgraph Cluster["OpenMetadata Cluster"]
        S1["Server 1<br/>(Triggering Server)"]
        S2["Server 2<br/>(Participant)"]
        SN["Server N<br/>(Participant)"]
    end

    subgraph Notification["Job Notification Layer"]
        direction LR
        Redis["Redis Pub/Sub<br/>(instant notification)"]
        Polling["Database Polling<br/>(30s fallback)"]
    end

    subgraph Persistence["Persistence Layer"]
        DB[(MySQL/PostgreSQL<br/>Jobs + Partitions)]
        ES[(Elasticsearch/<br/>OpenSearch)]
    end

    S1 -->|"1. Create job"| DB
    S1 -->|"2. Notify start"| Redis
    Redis -->|"3. Broadcast"| S2
    Redis -->|"3. Broadcast"| SN

    S1 & S2 & SN -->|"4. Claim partitions<br/>(atomic)"| DB
    S1 & S2 & SN -->|"5. Index entities"| ES
    S1 & S2 & SN -->|"6. Update stats"| DB
Loading

Component Architecture

flowchart TB
    subgraph Executor["DistributedSearchIndexExecutor"]
        E1["Triggered by Quartz<br/>(on ONE server via JDBC clustering)"]
        E2["Creates job and partitions"]
        E3["Notifies other servers"]
        E4["Processes partitions"]
        E5["Aggregates results"]
    end

    subgraph Notifier["DistributedJobNotifier<br/><<interface>>"]
        N1["notifyJobStarted()"]
        N2["notifyJobCompleted()"]
        N3["onJobStarted(callback)"]
    end

    subgraph Implementations["Notifier Implementations"]
        R["RedisJobNotifier<br/>- Zero latency<br/>- Pub/Sub channels"]
        P["PollingJobNotifier<br/>- 30s idle interval<br/>- 1s active interval"]
    end

    subgraph Participant["DistributedJobParticipant"]
        P1["Runs on ALL servers<br/>(Dropwizard Managed)"]
        P2["Listens for notifications"]
        P3["Claims & processes partitions"]
    end

    subgraph Coordinator["DistributedSearchIndexCoordinator"]
        C1["Manages job lifecycle"]
        C2["Atomic partition claiming<br/>(FOR UPDATE SKIP LOCKED)"]
        C3["Stats aggregation"]
    end

    Executor --> Notifier
    Notifier --> R
    Notifier --> P
    Participant --> Notifier
    Participant --> Coordinator
    Executor --> Coordinator
Loading

Job Execution Flow

Sequence Diagram

sequenceDiagram
    participant Q as Quartz Scheduler
    participant S1 as Server 1 (Trigger)
    participant N as Notifier
    participant S2 as Server 2
    participant DB as Database
    participant ES as Elasticsearch

    Q->>S1: Fire trigger (JDBC clustering ensures single server)

    rect rgb(240, 248, 255)
        Note over S1,DB: Job Initialization
        S1->>DB: Create job record (status=RUNNING)
        S1->>DB: Create partition records (status=PENDING)
        S1->>N: notifyJobStarted(jobId)
    end

    N->>S2: Broadcast job notification
    S2->>DB: Get job details
    S2->>DB: Check pending partitions

    rect rgb(240, 255, 240)
        Note over S1,ES: Parallel Processing
        par Server 1 Processing
            loop While partitions available
                S1->>DB: Claim partition (FOR UPDATE SKIP LOCKED)
                S1->>DB: Read entities for partition range
                S1->>ES: Bulk index entities
                S1->>DB: Update partition stats
            end
        and Server 2 Processing
            loop While partitions available
                S2->>DB: Claim partition (FOR UPDATE SKIP LOCKED)
                S2->>DB: Read entities for partition range
                S2->>ES: Bulk index entities
                S2->>DB: Update partition stats
            end
        end
    end

    rect rgb(255, 248, 240)
        Note over S1,DB: Job Completion
        S1->>DB: Aggregate stats from all partitions
        S1->>DB: Update job status (COMPLETED)
        S1->>N: notifyJobCompleted(jobId)
    end
Loading

Partition-Based Work Distribution

Partition Creation Strategy

flowchart TB
    subgraph Input["Input Parameters"]
        I1["Entity types: [table, dashboard, pipeline]"]
        I2["Batch size: 1000"]
        I3["Total records: 100,000 tables"]
    end

    subgraph Calculation["Partition Calculator"]
        C1["partitions = ceil(records / batchSize)"]
        C2["100 partitions for tables"]
    end

    subgraph Output["Created Partitions"]
        P0["Partition 0<br/>entity: table<br/>range: 0-999"]
        P1["Partition 1<br/>entity: table<br/>range: 1000-1999"]
        P2["Partition 2<br/>entity: table<br/>range: 2000-2999"]
        Pn["...<br/>Partition 99<br/>range: 99000-99999"]
    end

    Input --> Calculation --> Output
Loading

Atomic Partition Claiming

flowchart LR
    subgraph Server1["Server 1"]
        S1A["Request partition"]
    end

    subgraph Server2["Server 2"]
        S2A["Request partition"]
    end

    subgraph Database["Database"]
        Q["SELECT ... FOR UPDATE SKIP LOCKED"]
        P1["Partition 1: PENDING"]
        P2["Partition 2: PENDING"]
        P3["Partition 3: PENDING"]
    end

    S1A -->|"Claim"| Q
    S2A -->|"Claim"| Q
    Q -->|"Lock & Return"| P1
    Q -->|"Skip locked, Return"| P2

    style P1 fill:#90EE90
    style P2 fill:#90EE90
Loading

SQL Implementation:

-- Claim next available partition (MySQL/PostgreSQL)
UPDATE search_index_partition
SET status = 'PROCESSING',
    claimed_at = NOW(),
    claimed_by = ?
WHERE id = (
    SELECT id FROM search_index_partition
    WHERE job_id = ? AND status = 'PENDING'
    ORDER BY partition_index
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

Guarantees:

  • No duplicate processing - each partition claimed by exactly one server
  • No blocking - SKIP LOCKED allows concurrent claims without waiting
  • Fair distribution - servers claim work as they become available

Job Notification Mechanisms

Redis Pub/Sub (When Redis is configured)

flowchart LR
    subgraph Publisher["Server 1 (Publisher)"]
        P1["Job Started"]
    end

    subgraph Redis["Redis"]
        C1["Channel:<br/>om:distributed-jobs:start"]
        C2["Channel:<br/>om:distributed-jobs:complete"]
    end

    subgraph Subscribers["Subscribers"]
        S2["Server 2"]
        S3["Server 3"]
        SN["Server N"]
    end

    P1 -->|"PUBLISH"| C1
    C1 -->|"~0ms"| S2
    C1 -->|"~0ms"| S3
    C1 -->|"~0ms"| SN
Loading

Message Format: {jobId}|{jobType}|{sourceServerId}

Characteristics:

  • Latency: ~0ms (instant notification)
  • Overhead: Minimal (no polling)
  • Reliability: At-most-once delivery (acceptable for job notifications)

Database Polling (Fallback)

flowchart TB
    subgraph Polling["PollingJobNotifier"]
        direction TB
        P1["Poll Interval:<br/>IDLE: 30 seconds<br/>ACTIVE: 1 second"]
        P2["Query: SELECT id FROM search_index_job<br/>WHERE status = 'RUNNING'"]
    end

    subgraph Comparison["Polling Overhead Comparison"]
        Old["Old Design:<br/>1s always = 86,400 queries/day"]
        New["New Design:<br/>30s idle = 2,880 queries/day<br/>(96% reduction)"]
    end

    Polling --> Comparison
Loading

Notifier Selection (Factory Pattern)

flowchart TD
    Start["DistributedJobNotifierFactory.create()"]
    Check{"Redis configured?<br/>(cacheConfig.provider == redis)"}
    Redis["Return RedisJobNotifier"]
    Polling["Return PollingJobNotifier"]
    Log1["Log: Using redis-pubsub notifier"]
    Log2["Log: Using database-polling notifier<br/>(30s discovery delay)"]

    Start --> Check
    Check -->|"Yes"| Redis --> Log1
    Check -->|"No"| Polling --> Log2
Loading

Database Schema

Entity Relationship Diagram

erDiagram
    SEARCH_INDEX_JOB ||--o{ SEARCH_INDEX_PARTITION : contains

    SEARCH_INDEX_JOB {
        varchar id PK
        varchar status "PENDING|RUNNING|COMPLETED|FAILED|STOPPED"
        json job_configuration
        bigint total_records
        bigint processed_records
        bigint success_records
        bigint failed_records
        timestamp created_at
        timestamp started_at
        timestamp completed_at
        varchar triggered_by
        text failure_details
    }

    SEARCH_INDEX_PARTITION {
        varchar id PK
        varchar job_id FK
        varchar entity_type
        int partition_index
        bigint range_start
        bigint range_end
        varchar status "PENDING|PROCESSING|COMPLETED|FAILED"
        varchar claimed_by
        timestamp claimed_at
        bigint processed_count
        bigint success_count
        bigint failed_count
        timestamp started_at
        timestamp completed_at
        text last_error
        int retry_count
    }
Loading

Key Indexes

-- Job table indexes
CREATE INDEX idx_job_status ON search_index_job(status);
CREATE INDEX idx_job_created_at ON search_index_job(created_at);

-- Partition table indexes
CREATE INDEX idx_partition_job_status ON search_index_partition(job_id, status);
CREATE INDEX idx_partition_claiming ON search_index_partition(job_id, status, partition_index);

Server Statistics Tracking

flowchart TB
    subgraph Partitions["Completed Partitions"]
        P1["Partition 1<br/>claimed_by: server-1<br/>processed: 1000"]
        P2["Partition 2<br/>claimed_by: server-2<br/>processed: 1000"]
        P3["Partition 3<br/>claimed_by: server-1<br/>processed: 1000"]
        P4["Partition 4<br/>claimed_by: server-2<br/>processed: 1000"]
    end

    subgraph Aggregation["Stats Aggregation Query"]
        Q["SELECT claimed_by, COUNT(*), SUM(processed_count)...<br/>GROUP BY claimed_by"]
    end

    subgraph Result["Per-Server Stats"]
        R1["server-1: 2 partitions, 2000 records"]
        R2["server-2: 2 partitions, 2000 records"]
    end

    Partitions --> Aggregation --> Result
Loading

UI Display:

Server Processed Records Success Failed Partitions
server-1-abc123 45,000 45,000 0 45/45
server-2-def456 55,000 54,800 200 55/55

Configuration

Application Configuration (openmetadata.yaml)

# Cache configuration determines notification mechanism
cache:
  provider: redis               # 'redis' enables Pub/Sub, 'local' uses polling
  redis:
    url: "redis://localhost:6379"
    connectTimeoutMs: 5000
    database: 0
    useSSL: false

Search Indexing App Configuration

# In SearchIndexingApp configuration
batchSize: 1000              # Records per partition
maxConcurrentRequests: 10    # Concurrent ES/OS requests per server
payloadSize: 104857600       # Max payload size (100MB)

Performance Characteristics

Scaling Behavior

xychart-beta
    title "Indexing Time vs Server Count (1M Records)"
    x-axis ["1 Server", "2 Servers", "4 Servers", "8 Servers"]
    y-axis "Time (minutes)" 0 --> 25
    bar [20, 10, 5, 2.5]
Loading
Servers 100K Records 1M Records 10M Records
1 ~2 min ~20 min ~3.5 hours
2 ~1 min ~10 min ~1.75 hours
4 ~35 sec ~5 min ~52 min
8 ~20 sec ~2.5 min ~26 min

Note: Actual times depend on entity complexity, ES/OS cluster performance, and network latency.

When to Use Distributed Indexing

Dataset Size Recommendation
< 50K Single server (coordination overhead > benefit)
50K - 500K 2-3 servers
500K - 2M 3-5 servers
> 2M 5+ servers (maximum benefit)

Error Handling & Recovery

Partition Failure Handling

flowchart TD
    Start["Partition Processing"]
    Process["Process entities"]
    Error{"Error occurred?"}
    Success["Mark COMPLETED<br/>Update stats"]
    Fail["Mark FAILED<br/>Record error"]

    Crash["Server Crash"]
    Stale{"Partition stale?<br/>(claimed_at > threshold)"}
    Reset["Reset to PENDING"]
    Reclaim["Another server claims"]

    Start --> Process --> Error
    Error -->|"No"| Success
    Error -->|"Yes"| Fail

    Crash --> Stale
    Stale -->|"Yes"| Reset --> Reclaim
    Stale -->|"No"| Wait["Wait for timeout"]
Loading

Job-Level Failure Handling

flowchart TD
    Check["Check partition results"]
    Threshold{"Failed > threshold?"}
    Complete["Job: COMPLETED"]
    CompleteErrors["Job: COMPLETED_WITH_ERRORS"]
    Failed["Job: FAILED"]

    Check --> Threshold
    Threshold -->|"0% failed"| Complete
    Threshold -->|"< 10% failed"| CompleteErrors
    Threshold -->|"> 10% failed"| Failed
Loading

Class Diagram

classDiagram
    class DistributedJobNotifier {
        <<interface>>
        +notifyJobStarted(UUID jobId, String jobType)
        +notifyJobCompleted(UUID jobId)
        +onJobStarted(Consumer~UUID~ callback)
        +start()
        +stop()
        +isRunning() boolean
        +getType() String
    }

    class RedisJobNotifier {
        -CacheConfig.Redis redisConfig
        -RedisClient redisClient
        -StatefulRedisPubSubConnection subConnection
        -StatefulRedisConnection pubConnection
        +start()
        +stop()
        +notifyJobStarted()
        +notifyJobCompleted()
    }

    class PollingJobNotifier {
        -CollectionDAO collectionDAO
        -ScheduledExecutorService scheduler
        -long IDLE_POLL_INTERVAL_MS = 30000
        -long ACTIVE_POLL_INTERVAL_MS = 1000
        +start()
        +stop()
        +setParticipating(boolean)
    }

    class DistributedJobNotifierFactory {
        +create(CacheConfig, CollectionDAO, String)$ DistributedJobNotifier
    }

    class DistributedJobParticipant {
        -CollectionDAO collectionDAO
        -SearchRepository searchRepository
        -DistributedSearchIndexCoordinator coordinator
        -DistributedJobNotifier notifier
        -AtomicBoolean participating
        +start()
        +stop()
        -onJobDiscovered(UUID jobId)
        -joinAndProcessJob(SearchIndexJob job)
        -processJobPartitions(SearchIndexJob job)
    }

    class DistributedSearchIndexCoordinator {
        -CollectionDAO collectionDAO
        +createJob(config) SearchIndexJob
        +createPartitions(job, entities)
        +claimNextPartition(jobId) Optional~Partition~
        +updatePartitionStats(partition, stats)
        +completeJob(jobId)
        +getServerStats(jobId) List~ServerStats~
    }

    DistributedJobNotifier <|.. RedisJobNotifier
    DistributedJobNotifier <|.. PollingJobNotifier
    DistributedJobNotifierFactory ..> DistributedJobNotifier : creates
    DistributedJobParticipant --> DistributedJobNotifier
    DistributedJobParticipant --> DistributedSearchIndexCoordinator
Loading

Files Changed

New Files

File Description
DistributedJobNotifier.java Interface for job notifications
RedisJobNotifier.java Redis Pub/Sub implementation
PollingJobNotifier.java Database polling implementation
DistributedJobNotifierFactory.java Factory for notifier selection

Modified Files

File Changes
DistributedJobParticipant.java Uses notifier instead of internal polling
DistributedSearchIndexExecutor.java Sends notifications on job start/complete
OpenMetadataApplication.java Passes CacheConfig to participant
CollectionDAO.java Added getRunningJobIds() for lightweight polling
SQL migrations Added job/partition tables

Testing

  • Unit tests for all new components
  • Integration tests for distributed job coordination
  • End-to-end tests for multi-server scenarios
  • Tested with 25K, 100K, and simulated 1M+ entity datasets

Future Enhancements

  1. Dynamic partition rebalancing - Redistribute work when servers join/leave mid-job
  2. Priority-based scheduling - Process critical entity types first
  3. Incremental indexing - Only index changed entities since last run
  4. Progress streaming - Real-time progress updates via WebSocket
  5. Automatic threshold detection - Skip distributed mode for small datasets

@github-actions
Copy link
Contributor

TypeScript types have been updated based on the JSON schema changes in the PR

@github-actions
Copy link
Contributor

github-actions bot commented Dec 20, 2025

🛡️ TRIVY SCAN RESULT 🛡️

Target: openmetadata-ingestion-base-slim:trivy (debian 12.12)

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: Java

Vulnerabilities (33)

Package Vulnerability ID Severity Installed Version Fixed Version
com.fasterxml.jackson.core:jackson-core CVE-2025-52999 🚨 HIGH 2.12.7 2.15.0
com.fasterxml.jackson.core:jackson-core CVE-2025-52999 🚨 HIGH 2.13.4 2.15.0
com.fasterxml.jackson.core:jackson-databind CVE-2022-42003 🚨 HIGH 2.12.7 2.12.7.1, 2.13.4.2
com.fasterxml.jackson.core:jackson-databind CVE-2022-42004 🚨 HIGH 2.12.7 2.12.7.1, 2.13.4
com.google.code.gson:gson CVE-2022-25647 🚨 HIGH 2.2.4 2.8.9
com.google.protobuf:protobuf-java CVE-2021-22569 🚨 HIGH 3.3.0 3.16.1, 3.18.2, 3.19.2
com.google.protobuf:protobuf-java CVE-2022-3509 🚨 HIGH 3.3.0 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2022-3510 🚨 HIGH 3.3.0 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2024-7254 🚨 HIGH 3.3.0 3.25.5, 4.27.5, 4.28.2
com.google.protobuf:protobuf-java CVE-2021-22569 🚨 HIGH 3.7.1 3.16.1, 3.18.2, 3.19.2
com.google.protobuf:protobuf-java CVE-2022-3509 🚨 HIGH 3.7.1 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2022-3510 🚨 HIGH 3.7.1 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2024-7254 🚨 HIGH 3.7.1 3.25.5, 4.27.5, 4.28.2
com.nimbusds:nimbus-jose-jwt CVE-2023-52428 🚨 HIGH 9.8.1 9.37.2
com.squareup.okhttp3:okhttp CVE-2021-0341 🚨 HIGH 3.12.12 4.9.2
commons-beanutils:commons-beanutils CVE-2025-48734 🚨 HIGH 1.9.4 1.11.0
commons-io:commons-io CVE-2024-47554 🚨 HIGH 2.8.0 2.14.0
dnsjava:dnsjava CVE-2024-25638 🚨 HIGH 2.1.7 3.6.0
io.netty:netty-codec-http2 CVE-2025-55163 🚨 HIGH 4.1.96.Final 4.2.4.Final, 4.1.124.Final
io.netty:netty-codec-http2 GHSA-xpw8-rcwv-8f8p 🚨 HIGH 4.1.96.Final 4.1.100.Final
io.netty:netty-handler CVE-2025-24970 🚨 HIGH 4.1.96.Final 4.1.118.Final
net.minidev:json-smart CVE-2021-31684 🚨 HIGH 1.3.2 1.3.3, 2.4.4
net.minidev:json-smart CVE-2023-1370 🚨 HIGH 1.3.2 2.4.9
org.apache.avro:avro CVE-2024-47561 🔥 CRITICAL 1.7.7 1.11.4
org.apache.avro:avro CVE-2023-39410 🚨 HIGH 1.7.7 1.11.3
org.apache.derby:derby CVE-2022-46337 🔥 CRITICAL 10.14.2.0 10.14.3, 10.15.2.1, 10.16.1.2, 10.17.1.0
org.apache.ivy:ivy CVE-2022-46751 🚨 HIGH 2.5.1 2.5.2
org.apache.mesos:mesos CVE-2018-1330 🚨 HIGH 1.4.3 1.6.0
org.apache.thrift:libthrift CVE-2019-0205 🚨 HIGH 0.12.0 0.13.0
org.apache.thrift:libthrift CVE-2020-13949 🚨 HIGH 0.12.0 0.14.0
org.apache.zookeeper:zookeeper CVE-2023-44981 🔥 CRITICAL 3.6.3 3.7.2, 3.8.3, 3.9.1
org.eclipse.jetty:jetty-server CVE-2024-13009 🚨 HIGH 9.4.56.v20240826 9.4.57.v20241219
org.lz4:lz4-java CVE-2025-12183 🚨 HIGH 1.8.0 1.8.1

🛡️ TRIVY SCAN RESULT 🛡️

Target: Node.js

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: Python

Vulnerabilities (4)

Package Vulnerability ID Severity Installed Version Fixed Version
starlette CVE-2025-62727 🚨 HIGH 0.48.0 0.49.1
urllib3 CVE-2025-66418 🚨 HIGH 1.26.20 2.6.0
urllib3 CVE-2025-66471 🚨 HIGH 1.26.20 2.6.0
urllib3 CVE-2026-21441 🚨 HIGH 1.26.20 2.6.3

🛡️ TRIVY SCAN RESULT 🛡️

Target: /etc/ssl/private/ssl-cert-snakeoil.key

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/extended_sample_data.yaml

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/lineage.yaml

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_data.json

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_data.yaml

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_data_aut.yaml

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_usage.json

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_usage.yaml

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /ingestion/pipelines/sample_usage_aut.yaml

No Vulnerabilities Found

@github-actions
Copy link
Contributor

github-actions bot commented Dec 20, 2025

🛡️ TRIVY SCAN RESULT 🛡️

Target: openmetadata-ingestion:trivy (debian 12.12)

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: Java

Vulnerabilities (33)

Package Vulnerability ID Severity Installed Version Fixed Version
com.fasterxml.jackson.core:jackson-core CVE-2025-52999 🚨 HIGH 2.12.7 2.15.0
com.fasterxml.jackson.core:jackson-core CVE-2025-52999 🚨 HIGH 2.13.4 2.15.0
com.fasterxml.jackson.core:jackson-databind CVE-2022-42003 🚨 HIGH 2.12.7 2.12.7.1, 2.13.4.2
com.fasterxml.jackson.core:jackson-databind CVE-2022-42004 🚨 HIGH 2.12.7 2.12.7.1, 2.13.4
com.google.code.gson:gson CVE-2022-25647 🚨 HIGH 2.2.4 2.8.9
com.google.protobuf:protobuf-java CVE-2021-22569 🚨 HIGH 3.3.0 3.16.1, 3.18.2, 3.19.2
com.google.protobuf:protobuf-java CVE-2022-3509 🚨 HIGH 3.3.0 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2022-3510 🚨 HIGH 3.3.0 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2024-7254 🚨 HIGH 3.3.0 3.25.5, 4.27.5, 4.28.2
com.google.protobuf:protobuf-java CVE-2021-22569 🚨 HIGH 3.7.1 3.16.1, 3.18.2, 3.19.2
com.google.protobuf:protobuf-java CVE-2022-3509 🚨 HIGH 3.7.1 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2022-3510 🚨 HIGH 3.7.1 3.16.3, 3.19.6, 3.20.3, 3.21.7
com.google.protobuf:protobuf-java CVE-2024-7254 🚨 HIGH 3.7.1 3.25.5, 4.27.5, 4.28.2
com.nimbusds:nimbus-jose-jwt CVE-2023-52428 🚨 HIGH 9.8.1 9.37.2
com.squareup.okhttp3:okhttp CVE-2021-0341 🚨 HIGH 3.12.12 4.9.2
commons-beanutils:commons-beanutils CVE-2025-48734 🚨 HIGH 1.9.4 1.11.0
commons-io:commons-io CVE-2024-47554 🚨 HIGH 2.8.0 2.14.0
dnsjava:dnsjava CVE-2024-25638 🚨 HIGH 2.1.7 3.6.0
io.netty:netty-codec-http2 CVE-2025-55163 🚨 HIGH 4.1.96.Final 4.2.4.Final, 4.1.124.Final
io.netty:netty-codec-http2 GHSA-xpw8-rcwv-8f8p 🚨 HIGH 4.1.96.Final 4.1.100.Final
io.netty:netty-handler CVE-2025-24970 🚨 HIGH 4.1.96.Final 4.1.118.Final
net.minidev:json-smart CVE-2021-31684 🚨 HIGH 1.3.2 1.3.3, 2.4.4
net.minidev:json-smart CVE-2023-1370 🚨 HIGH 1.3.2 2.4.9
org.apache.avro:avro CVE-2024-47561 🔥 CRITICAL 1.7.7 1.11.4
org.apache.avro:avro CVE-2023-39410 🚨 HIGH 1.7.7 1.11.3
org.apache.derby:derby CVE-2022-46337 🔥 CRITICAL 10.14.2.0 10.14.3, 10.15.2.1, 10.16.1.2, 10.17.1.0
org.apache.ivy:ivy CVE-2022-46751 🚨 HIGH 2.5.1 2.5.2
org.apache.mesos:mesos CVE-2018-1330 🚨 HIGH 1.4.3 1.6.0
org.apache.thrift:libthrift CVE-2019-0205 🚨 HIGH 0.12.0 0.13.0
org.apache.thrift:libthrift CVE-2020-13949 🚨 HIGH 0.12.0 0.14.0
org.apache.zookeeper:zookeeper CVE-2023-44981 🔥 CRITICAL 3.6.3 3.7.2, 3.8.3, 3.9.1
org.eclipse.jetty:jetty-server CVE-2024-13009 🚨 HIGH 9.4.56.v20240826 9.4.57.v20241219
org.lz4:lz4-java CVE-2025-12183 🚨 HIGH 1.8.0 1.8.1

🛡️ TRIVY SCAN RESULT 🛡️

Target: Node.js

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: Python

Vulnerabilities (9)

Package Vulnerability ID Severity Installed Version Fixed Version
Werkzeug CVE-2024-34069 🚨 HIGH 2.2.3 3.0.3
aiohttp CVE-2025-69223 🚨 HIGH 3.12.12 3.13.3
aiohttp CVE-2025-69223 🚨 HIGH 3.13.2 3.13.3
deepdiff CVE-2025-58367 🔥 CRITICAL 7.0.1 8.6.1
ray CVE-2025-62593 🔥 CRITICAL 2.47.1 2.52.0
starlette CVE-2025-62727 🚨 HIGH 0.48.0 0.49.1
urllib3 CVE-2025-66418 🚨 HIGH 1.26.20 2.6.0
urllib3 CVE-2025-66471 🚨 HIGH 1.26.20 2.6.0
urllib3 CVE-2026-21441 🚨 HIGH 1.26.20 2.6.3

🛡️ TRIVY SCAN RESULT 🛡️

Target: /etc/ssl/private/ssl-cert-snakeoil.key

No Vulnerabilities Found

🛡️ TRIVY SCAN RESULT 🛡️

Target: /home/airflow/openmetadata-airflow-apis/openmetadata_managed_apis.egg-info/PKG-INFO

No Vulnerabilities Found

@github-actions
Copy link
Contributor

github-actions bot commented Dec 20, 2025

Jest test Coverage

UI tests summary

Lines Statements Branches Functions
Coverage: 64%
64.66% (51619/79837) 42.26% (25259/59765) 45.83% (7959/17365)

CountDownLatch completionLatch = new CountDownLatch(1);
ScheduledExecutorService monitor =
Executors.newSingleThreadScheduledExecutor(
Thread.ofPlatform().name("distributed-monitor").factory());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use virtual thread here

@gitar-bot
Copy link

gitar-bot bot commented Jan 8, 2026

🔍 CI failure analysis for 95cffb0: Integration tests fail due to distributed indexing job state persistence (RELATED to PR). Playwright tests fail with browser closures and element visibility issues (UNRELATED to PR).

Issue

Multiple CI job failures:

  1. Integration Tests (RELATED TO PR):

    • integration-tests-postgres-opensearch (job 59770462658)
    • integration-tests-mysql-elasticsearch (job 59770462774)
    • Failure: AppsResourceIT.test_triggerApp_200 - "Job is already running"
  2. Playwright E2E Tests (UNRELATED TO PR):

    • playwright-ci-postgresql (6, 6) (job 59770462844) - 1 failed
    • playwright-ci-postgresql (3, 6) (job 59770462842) - 3 failed
    • Failures: Various UI tests with browser closures and element visibility issues

Root Cause Analysis

1. Integration Test Failures (RELATED)

Concurrency Control Issue in Distributed Search Indexing

This failure is directly related to the distributed search indexing changes. The test AppsResourceIT.test_triggerApp_200 receives "Job is already running, please wait for it to complete" error.

Why This Happens:

The refactored code (commit 95cffb0 "Refactor Code for Single Server and Multiple Server") introduced database-backed job state tracking:

  1. Job record created in database with status RUNNING
  2. Job lock acquired to prevent concurrent execution
  3. If test doesn't wait for completion or clean up properly, job remains RUNNING
  4. Next test attempt sees existing RUNNING job and rejects the trigger

Files modified affecting this:

  • SearchIndexApp.java, DistributedSearchIndexExecutor.java, SearchIndexJob.java, DistributedSearchIndexCoordinator.java, JobRecoveryManager.java

Consistency: Both PostgreSQL/OpenSearch and MySQL/Elasticsearch exhibit the same failure, confirming this is deterministic.

2. Playwright Test Failures (UNRELATED)

Failures in job 59770462844 (1 test):

  • AutoPilot Kafka status check
  • Glossary tags/owners/reviewers tests
  • Tag verification UI
  • Users data steward permissions
  • Classification version page

Failures in job 59770462842 (3 tests):

  • Restore entity inheritance tests
  • Settings navigation blocker tests
  • Table tags/glossary consistency for search
  • Explore/Discovery query filter tests
  • Search suggestions tests

Error Patterns:

  • "Target page, context or browser has been closed"
  • "element(s) not found"
  • "toBeVisible failed" / "toBeAttached failed"
  • "Test timeout exceeded"
  • "Cannot read properties of undefined"

Analysis: These Playwright failures are UNRELATED to the distributed search indexing PR:

  1. No UI changes in affected areas: The PR only modifies AppLogsViewer component, but failures are in Tags, Glossary, Users, Settings navigation, and Explore pages
  2. Common Playwright flakiness patterns: Browser context closures, element visibility timeouts, race conditions
  3. Distributed across unrelated features: No common thread connecting to search indexing backend changes
  4. Backend-only changes: The distributed indexing is purely backend Java code with no frontend interaction beyond the existing AppLogsViewer

Details

Integration Test Impact: Test isolation problem - distributed job state persists in database across test runs, causing subsequent triggers to fail.

Playwright Impact: These are likely pre-existing flaky tests or infrastructure issues unrelated to this PR's backend changes.

Additional Context: The Playwright failures show classic symptoms of UI test flakiness (timing issues, browser state management) rather than functional regressions from backend changes.

Code Review 👍 Approved with suggestions

Well-architected distributed search indexing system with comprehensive test coverage. Two previous minor concerns remain valid regarding the check-then-act in-flight counting and inconsistent server ID generation approaches.

⚠️ Edge Case: Lock acquisition uses check-then-act pattern which is not fully atomic

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexCoordinator.java:2478

The tryAcquireLock method in SearchReindexLockDAO uses a check-then-act pattern that is not atomic:\n1. Delete expired locks\n2. Check if lock exists and is not expired\n3. Delete stale lock\n4. Insert new lock\n\nBetween checking if a lock exists and inserting a new one, another server could acquire the lock, leading to a race condition. The INSERT could then fail (which is caught), but this relies on database-level constraints.\n\nImpact: In a highly concurrent environment with multiple servers starting simultaneously, there's a small window where two servers might believe they've acquired the lock.\n\nSuggested fix: Use a single atomic SQL operation like INSERT ... ON DUPLICATE KEY UPDATE for MySQL or INSERT ... ON CONFLICT DO NOTHING for PostgreSQL, and check the affected row count to determine if the lock was acquired.

More details 💡 1 suggestion ✅ 7 resolved
💡 Code Quality: Inconsistent server ID generation - uses different approach than ServerIdentityResolver

📄 openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java:913

The registerDistributedJobParticipant method generates server ID as hostname + System.currentTimeMillis() while the rest of the distributed indexing code uses ServerIdentityResolver which has a more robust approach (Quartz scheduler ID, hostname+PID, or UUID fallback).\n\nThis inconsistency could cause issues:\n1. If a container restarts quickly (within same millisecond), IDs might collide\n2. Different parts of the code might use different identifiers for the same server\n\nSuggested fix: Use ServerIdentityResolver.getInstance().getServerId() consistently throughout the codebase, or pass it to the DistributedJobParticipant constructor.

Bug: Potential integer overflow in stats conversion with safeToInt()

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobStatsAggregator.java:163
In DistributedJobStatsAggregator.java, the safeToInt() method uses Math.toIntExact() which throws ArithmeticException if the value overflows. Given that the job can handle very large datasets (up to 50,000 partitions × up to 50,000 entities each = billions of records), totalRecords could exceed Integer.MAX_VALUE (2.1 billion).

The StepStats class uses Integer for record counts, but for distributed indexing at scale, this could overflow:

private int safeToInt(long value) {
  return (int) Math.min(value, Integer.MAX_VALUE);
}

However, this just silently caps values. Better to either:

  1. Accept that very large values get capped and document this limitation
  2. Use a different approach for large-scale stats

For now, the capping approach is reasonable but the comment should clarify this is intentional for compatibility with the existing StepStats schema.

Edge Case: Redis connection error handling swallows exceptions on stop()

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/RedisJobNotifier.java:91
In RedisJobNotifier.java, the stop() method catches all exceptions and only logs warnings:

try {
  if (subConnection != null) {
    subConnection.close();
  }
  if (pubConnection != null) {
    pubConnection.close();
  }
  if (redisClient != null) {
    redisClient.shutdown();
  }
} catch (Exception e) {
  LOG.warn("Error closing Redis connections", e);
}

This is generally fine for shutdown, but if cleanup fails, subsequent restarts might have stale connections. Consider ensuring all resources are nulled out even on error to prevent memory leaks, and potentially retry shutdown with a timeout.

Code Quality: Hardcoded test sleep durations may cause flaky tests

📄 openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobParticipantTest.java:128
In DistributedJobParticipantTest.java, there's a hardcoded sleep:

Thread.sleep(6000);

This 6-second sleep makes tests slow and could still be flaky depending on system load. The test already uses Awaitility elsewhere with more robust polling. Consider replacing this with Awaitility-based waiting or ensuring the scheduler runs at least once through a different mechanism.

The test comment mentions "Wait a bit for the scheduler to run at least once" - using Awaitility with a condition would be more reliable:

Awaitility.await()
    .atMost(10, TimeUnit.SECONDS)
    .until(() -> /* condition that scheduler has run */);
Edge Case: Integer overflow possible with very large partition calculations

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionCalculator.java:177
In the partition range calculation:

long rangeStart = (long) i * adjustedPartitionSize;

While i is cast to long, adjustedPartitionSize is an int. If adjustedPartitionSize is at MAX_PARTITION_SIZE (50000), and there are many partitions, the calculation is safe. However, the rangeEnd calculation uses:

long rangeEnd = Math.min(rangeStart + adjustedPartitionSize, totalCount);

This should work correctly since rangeStart is already a long. However, for consistency and clarity, consider explicitly casting:

long rangeEnd = Math.min(rangeStart + (long) adjustedPartitionSize, totalCount);

This is a minor improvement for code clarity and future-proofing against refactoring errors.

Edge Case: NullPointerException risk if getBoolean returns null

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java:244-253
In the DistributedSearchIndexExecutor around where it reads the useDistributedIndexing configuration, if the job configuration doesn't have the useDistributedIndexing field set, there could be a NullPointerException when trying to auto-unbox the Boolean.

Looking at how jobData.getUseDistributedIndexing() is called, ensure null-safety:

// Instead of:
Boolean.TRUE.equals(jobData.getUseDistributedIndexing())

// Consider defensive null handling:
jobData.getUseDistributedIndexing() != null && jobData.getUseDistributedIndexing()

The code appears to handle this with Boolean.TRUE.equals() pattern in most places, which is safe. But verify all usages are protected.

...and 2 more from earlier reviews

What Works Well

The distributed indexing architecture is solid: atomic partition claiming with FOR UPDATE SKIP LOCKED prevents duplicate processing; staggered partition release avoids thundering herd; comprehensive recovery manager handles orphaned jobs; and extensive integration tests validate concurrent claiming behavior.

Recommendations

Consider unifying server ID generation: OpenMetadataApplication.registerDistributedJobParticipant uses hostname+timestamp while ServerIdentityResolver uses Quartz ID/hostname+PID. Using ServerIdentityResolver consistently would ensure the same server identity across all components.

Tip

Comment Gitar fix CI or enable auto-apply: gitar auto-apply:on

Options

Auto-apply is off Gitar will not commit updates to this branch.
Display: compact Hiding non-applicable rules.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | This comment will update automatically (Docs)

@sonarqubecloud
Copy link

sonarqubecloud bot commented Jan 8, 2026

@sonarqubecloud
Copy link

sonarqubecloud bot commented Jan 8, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants