diff --git a/compute/src/main/java/org/zstack/compute/host/HostGlobalConfig.java b/compute/src/main/java/org/zstack/compute/host/HostGlobalConfig.java index b13f95673c..22af03fef5 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostGlobalConfig.java +++ b/compute/src/main/java/org/zstack/compute/host/HostGlobalConfig.java @@ -56,6 +56,14 @@ public class HostGlobalConfig { @GlobalConfigValidation(numberGreaterThan = 1) public static GlobalConfig SYNC_HOST_HW_MONITOR_INTERVAL = new GlobalConfig(CATEGORY, "sync.host.hw.monitor.interval"); + @GlobalConfigValidation(numberGreaterThan = -1) + @GlobalConfigDef(type = Integer.class, defaultValue = "30", description = "max jitter in seconds added to host reconnect initial delay to prevent thundering herd in large-scale environments") + public static GlobalConfig RECONNECT_JITTER_MAX_SECONDS = new GlobalConfig(CATEGORY, "connection.reconnectJitterMaxSeconds"); + + @GlobalConfigValidation(numberGreaterThan = 0) + @GlobalConfigDef(type = Integer.class, defaultValue = "100", description = "max number of concurrent host reconnect operations to prevent overwhelming the management node") + public static GlobalConfig RECONNECT_MAX_CONCURRENCY = new GlobalConfig(CATEGORY, "connection.reconnectMaxConcurrency"); + @GlobalConfigValidation @GlobalConfigDef(type = String.class, defaultValue = "10501:10999", description = "nbd port range") public static GlobalConfig NBD_PORT_RANGE = new GlobalConfig(CATEGORY, "nbd.port.range"); diff --git a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java index 48ed6a7dc4..50be711885 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java @@ -28,6 +28,7 @@ import javax.persistence.Tuple; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -56,6 +57,9 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener, private Map hostDisconnectCount = new ConcurrentHashMap<>(); + private static final Random reconnectJitterRandom = new Random(); + private volatile Semaphore reconnectSemaphore = new Semaphore(HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.defaultValue(Integer.class), true); + @Override public void managementNodeReady() { reScanHost(); @@ -69,6 +73,12 @@ enum ReconnectDecision { } private void reconnectNow(String uuid, Completion completion) { + if (!reconnectSemaphore.tryAcquire()) { + logger.debug(String.format("[Host Tracker]: reconnect concurrency limit reached, deferring reconnect for host[uuid:%s]", uuid)); + completion.success(); + return; + } + ReconnectHostMsg msg = new ReconnectHostMsg(); msg.setHostUuid(uuid); msg.setSkipIfHostConnected(true); @@ -76,6 +86,7 @@ private void reconnectNow(String uuid, Completion completion) { bus.send(msg, new CloudBusCallBack(completion) { @Override public void run(MessageReply reply) { + reconnectSemaphore.release(); if (reply.isSuccess()) { completion.success(); } else { @@ -267,7 +278,14 @@ public void trackHost(String hostUuid) { if (CoreGlobalProperty.UNIT_TEST_ON && !alwaysStartRightNow) { t.start(); } else { - t.startRightNow(); + int jitterMaxSeconds = HostGlobalConfig.RECONNECT_JITTER_MAX_SECONDS.value(Integer.class); + if (jitterMaxSeconds > 0) { + int jitterSeconds = reconnectJitterRandom.nextInt(jitterMaxSeconds + 1); + logger.debug(String.format("starting tracking host[uuid:%s] with jitter delay %d seconds", hostUuid, jitterSeconds)); + thdf.submitTimeoutTask(t, TimeUnit.SECONDS, jitterSeconds); + } else { + t.startRightNow(); + } } logger.debug(String.format("starting tracking hosts[uuid:%s]", hostUuid)); @@ -357,6 +375,15 @@ public boolean start() { onHostStatusChange(); onHostPingSkip(); + // Initialize semaphore from live config (may differ from default at startup) + reconnectSemaphore = new Semaphore(HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.value(Integer.class), true); + + HostGlobalConfig.RECONNECT_MAX_CONCURRENCY.installUpdateExtension((oldConfig, newConfig) -> { + logger.debug(String.format("%s change from %s to %s, updating reconnect semaphore", + oldConfig.getCanonicalName(), oldConfig.value(), newConfig.value())); + reconnectSemaphore = new Semaphore(newConfig.value(Integer.class), true); + }); + HostGlobalConfig.PING_HOST_INTERVAL.installUpdateExtension((oldConfig, newConfig) -> { logger.debug(String.format("%s change from %s to %s, restart host trackers", oldConfig.getCanonicalName(), oldConfig.value(), newConfig.value()));