Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class HostGlobalConfig {
@GlobalConfigValidation(numberGreaterThan = 1)
public static GlobalConfig SYNC_HOST_HW_MONITOR_INTERVAL = new GlobalConfig(CATEGORY, "sync.host.hw.monitor.interval");

@GlobalConfigValidation(numberGreaterThan = -1)
@GlobalConfigDef(type = Integer.class, defaultValue = "30", description = "max jitter in seconds added to host reconnect initial delay to prevent thundering herd in large-scale environments")
public static GlobalConfig RECONNECT_JITTER_MAX_SECONDS = new GlobalConfig(CATEGORY, "connection.reconnectJitterMaxSeconds");

@GlobalConfigValidation(numberGreaterThan = 0)
@GlobalConfigDef(type = Integer.class, defaultValue = "100", description = "max number of concurrent host reconnect operations to prevent overwhelming the management node")
public static GlobalConfig RECONNECT_MAX_CONCURRENCY = new GlobalConfig(CATEGORY, "connection.reconnectMaxConcurrency");

@GlobalConfigValidation
@GlobalConfigDef(type = String.class, defaultValue = "10501:10999", description = "nbd port range")
public static GlobalConfig NBD_PORT_RANGE = new GlobalConfig(CATEGORY, "nbd.port.range");
Expand Down
29 changes: 28 additions & 1 deletion compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.persistence.Tuple;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -56,6 +57,9 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener,

private Map<String, AtomicInteger> hostDisconnectCount = new ConcurrentHashMap<>();

private static final Random reconnectJitterRandom = new Random();
private volatile Semaphore reconnectSemaphore = new Semaphore(HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.defaultValue(Integer.class), true);

@Override
public void managementNodeReady() {
reScanHost();
Expand All @@ -69,13 +73,20 @@ enum ReconnectDecision {
}

private void reconnectNow(String uuid, Completion completion) {
if (!reconnectSemaphore.tryAcquire()) {
logger.debug(String.format("[Host Tracker]: reconnect concurrency limit reached, deferring reconnect for host[uuid:%s]", uuid));
completion.success();
return;
}
Comment on lines +76 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

动态配置更新后可能释放到错误的 Semaphore,导致并发上限失真。

Line 76 获取许可时使用的是“当前实例”,但 Line 89 释放时再次读取 reconnectSemaphore 字段。若期间触发 Line 381-385 的热更新替换实例,释放会落到新实例,造成旧实例许可泄漏、新实例许可超发。

🔧 建议修复(确保 acquire/release 使用同一实例)
 private void reconnectNow(String uuid, Completion completion) {
-    if (!reconnectSemaphore.tryAcquire()) {
+    final Semaphore acquiredSemaphore = reconnectSemaphore;
+    if (!acquiredSemaphore.tryAcquire()) {
         logger.debug(String.format("[Host Tracker]: reconnect concurrency limit reached, deferring reconnect for host[uuid:%s]", uuid));
         completion.success();
         return;
     }

     ReconnectHostMsg msg = new ReconnectHostMsg();
     msg.setHostUuid(uuid);
     msg.setSkipIfHostConnected(true);
     bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, uuid);
-    bus.send(msg, new CloudBusCallBack(completion) {
-        `@Override`
-        public void run(MessageReply reply) {
-            reconnectSemaphore.release();
-            if (reply.isSuccess()) {
-                completion.success();
-            } else {
-                completion.fail(reply.getError());
-            }
-        }
-    });
+    try {
+        bus.send(msg, new CloudBusCallBack(completion) {
+            `@Override`
+            public void run(MessageReply reply) {
+                acquiredSemaphore.release();
+                if (reply.isSuccess()) {
+                    completion.success();
+                } else {
+                    completion.fail(reply.getError());
+                }
+            }
+        });
+    } catch (Throwable t) {
+        acquiredSemaphore.release();
+        throw t;
+    }
 }

Also applies to: 89-89, 381-385

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java` around
lines 76 - 80, The reconnectSemaphore is read twice from the field (once for
tryAcquire and again for release), which can cause acquire/release to happen on
different Semaphore instances after a hot-reload; fix by capturing the field
into a local final variable (e.g., final Semaphore localReconnectSemaphore =
this.reconnectSemaphore) immediately before tryAcquire in the HostTrackImpl
method that does reconnect handling, use that local variable for both tryAcquire
and release (and any early returns) so the same Semaphore instance is used
regardless of concurrent updates.


ReconnectHostMsg msg = new ReconnectHostMsg();
msg.setHostUuid(uuid);
msg.setSkipIfHostConnected(true);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, uuid);
bus.send(msg, new CloudBusCallBack(completion) {
@Override
public void run(MessageReply reply) {
reconnectSemaphore.release();
if (reply.isSuccess()) {
completion.success();
} else {
Expand Down Expand Up @@ -267,7 +278,14 @@ public void trackHost(String hostUuid) {
if (CoreGlobalProperty.UNIT_TEST_ON && !alwaysStartRightNow) {
t.start();
} else {
t.startRightNow();
int jitterMaxSeconds = HostGlobalConfig.RECONNECT_JITTER_MAX_SECONDS.value(Integer.class);
if (jitterMaxSeconds > 0) {
int jitterSeconds = reconnectJitterRandom.nextInt(jitterMaxSeconds + 1);
logger.debug(String.format("starting tracking host[uuid:%s] with jitter delay %d seconds", hostUuid, jitterSeconds));
thdf.submitTimeoutTask(t, TimeUnit.SECONDS, jitterSeconds);
} else {
t.startRightNow();
}
}

logger.debug(String.format("starting tracking hosts[uuid:%s]", hostUuid));
Expand Down Expand Up @@ -357,6 +375,15 @@ public boolean start() {
onHostStatusChange();
onHostPingSkip();

// Initialize semaphore from live config (may differ from default at startup)
reconnectSemaphore = new Semaphore(HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.value(Integer.class), true);

HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.installUpdateExtension((oldConfig, newConfig) -> {
logger.debug(String.format("%s change from %s to %s, updating reconnect semaphore",
oldConfig.getCanonicalName(), oldConfig.value(), newConfig.value()));
reconnectSemaphore = new Semaphore(newConfig.value(Integer.class), true);
});

HostGlobalConfig.PING_HOST_INTERVAL.installUpdateExtension((oldConfig, newConfig) -> {
logger.debug(String.format("%s change from %s to %s, restart host trackers",
oldConfig.getCanonicalName(), oldConfig.value(), newConfig.value()));
Expand Down