-
Notifications
You must be signed in to change notification settings - Fork 0
<fix>[host]: adaptive EMA-based ping timeout for large-scale clusters [ZSTAC-67534] #3391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 5.5.12
Are you sure you want to change the base?
Changes from all commits
20160c7
c8365ef
e55f1da
c9f2f22
5dd8f9c
b785677
91c1e02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,39 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener, | |
| private static boolean alwaysStartRightNow = false; | ||
| private static final Cache<String, Long> skippedPingHostDeadline = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build(); | ||
|
|
||
| // EMA adaptive timeout: tracks per-host exponential moving average of ping response times (in seconds) | ||
| private static final double EMA_ALPHA = 0.2; | ||
| private static final double EMA_SAFETY_FACTOR = 3.0; | ||
| private static final ConcurrentHashMap<String, Double> pingResponseEma = new ConcurrentHashMap<>(); | ||
|
|
||
| /** | ||
| * Returns the ping timeout for the given host (in seconds). | ||
| * When adaptive timeout is enabled (host.ping.adaptiveTimeout.enable=true), | ||
| * uses EMA of observed response times * safety factor, floored by the configured value. | ||
| * When disabled, returns the configured ping.timeout directly. | ||
| */ | ||
| public static long getAdaptiveTimeout(String hostUuid) { | ||
| long configured = HostGlobalConfig.PING_HOST_TIMEOUT.value(Long.class); | ||
| if (!HostGlobalConfig.PING_ADAPTIVE_TIMEOUT_ENABLED.value(Boolean.class)) { | ||
| return configured; | ||
| } | ||
| Double ema = pingResponseEma.get(hostUuid); | ||
| if (ema == null) { | ||
| return configured; | ||
| } | ||
| long adaptive = (long) Math.ceil(ema * EMA_SAFETY_FACTOR); | ||
| return Math.max(configured, adaptive); | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Updates the per-host EMA of ping response times using exponential moving average. | ||
| * New EMA = alpha * sample + (1 - alpha) * oldEMA. | ||
| */ | ||
| private static void updatePingResponseEma(String hostUuid, double responseTimeSec) { | ||
| pingResponseEma.merge(hostUuid, responseTimeSec, | ||
| (oldEma, sample) -> EMA_ALPHA * sample + (1 - EMA_ALPHA) * oldEma); | ||
| } | ||
|
|
||
| @Autowired | ||
| private DatabaseFacade dbf; | ||
| @Autowired | ||
|
|
@@ -136,6 +169,7 @@ private void track() { | |
| PingHostMsg msg = new PingHostMsg(); | ||
| msg.setHostUuid(uuid); | ||
| bus.makeLocalServiceId(msg, HostConstant.SERVICE_ID); | ||
| final long pingStartMs = System.currentTimeMillis(); | ||
| bus.send(msg, new CloudBusCallBack(null) { | ||
|
Comment on lines
+172
to
173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: fd -t f -name "HostTrackImpl.java"Repository: MatheMatrix/zstack Length of output: 292 🏁 Script executed: cat -n compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java | sed -n '160,200p'Repository: MatheMatrix/zstack Length of output: 2717 🌐 Web query:
💡 Result: Best practice (elapsed time)Use long t0 = System.nanoTime();
// work
long elapsedNanos = System.nanoTime() - t0;When to use
|
||
| @Override | ||
| public void run(MessageReply reply) { | ||
|
|
@@ -148,6 +182,12 @@ private ReconnectDecision makeReconnectDecision(MessageReply reply) { | |
| return ReconnectDecision.DoNothing; | ||
| } | ||
|
|
||
| // update EMA with observed response time on successful ping | ||
| double responseTimeSec = (System.currentTimeMillis() - pingStartMs) / 1000.0; | ||
| updatePingResponseEma(uuid, responseTimeSec); | ||
| logger.debug(String.format("[Host Tracker]: host[uuid:%s] ping response time %.2fs, adaptive timeout %ds", | ||
| uuid, responseTimeSec, getAdaptiveTimeout(uuid))); | ||
|
|
||
| PingHostReply r = reply.castReply(); | ||
| if (r.isNoReconnect()) { | ||
| return ReconnectDecision.DoNothing; | ||
|
|
@@ -280,6 +320,7 @@ public void untrackHost(String huuid) { | |
| t.cancel(); | ||
| } | ||
| trackers.remove(huuid); | ||
| pingResponseEma.remove(huuid); | ||
| logger.debug(String.format("stop tracking host[uuid:%s]", huuid)); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.http.impl.nio.reactor.IOReactorConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.http.nio.reactor.IOReactorException; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.http.pool.PoolStats; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.logging.log4j.ThreadContext; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.springframework.http.*; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -30,6 +31,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.telemetry.TelemetryGlobalProperty; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.thread.AsyncThread; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.thread.CancelablePeriodicTask; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.thread.PeriodicTask; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.thread.ThreadFacade; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.thread.ThreadFacadeImpl.TimeoutTaskReceipt; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import org.zstack.core.timeout.ApiTimeoutManager; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -79,6 +81,15 @@ public class RESTFacadeImpl implements RESTFacade { | |||||||||||||||||||||||||||||||||||||||||||||||
| private String callbackUrl; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private TimeoutRestTemplate template; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private AsyncRestTemplate asyncRestTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||
| // P0: dedicated ping pool — isolated from business traffic (R2) | ||||||||||||||||||||||||||||||||||||||||||||||||
| private AsyncRestTemplate pingAsyncRestTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R5: store connection managers for DumpConnectionPoolStatus observability | ||||||||||||||||||||||||||||||||||||||||||||||||
| private PoolingNHttpClientConnectionManager asyncConnManager; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private PoolingNHttpClientConnectionManager pingConnManager; | ||||||||||||||||||||||||||||||||||||||||||||||||
| // ThreadLocal allows asyncJsonPostForPing() to inject the ping template without changing asyncJson() signature | ||||||||||||||||||||||||||||||||||||||||||||||||
| private static final ThreadLocal<AsyncRestTemplate> ASYNC_TEMPLATE_OVERRIDE = new ThreadLocal<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R2: P0 ping pool capacity — covers largest expected cluster (3000 nodes) | ||||||||||||||||||||||||||||||||||||||||||||||||
| private static final int PING_POOL_MAX_TOTAL = 3000; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private String baseUrl; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private String sendCommandUrl; | ||||||||||||||||||||||||||||||||||||||||||||||||
| private String callbackHostName; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -182,6 +193,28 @@ void init() { | |||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug(sb.toString()); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // R5: pool observability — trigger via: kill -USR2 <pid> or zstack-ctl dump_connection_pool | ||||||||||||||||||||||||||||||||||||||||||||||||
| DebugManager.registerDebugSignalHandler("DumpConnectionPoolStatus", () -> { | ||||||||||||||||||||||||||||||||||||||||||||||||
| StringBuilder sb = new StringBuilder(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append("\n============= BEGIN: Connection Pool Status (R5) =================\n"); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (asyncConnManager != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolStats s = asyncConnManager.getTotalStats(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append(String.format("async-pool : maxTotal=%-5d leased=%-5d available=%-5d pending=%d%n", | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_MAX_TOTAL, s.getLeased(), s.getAvailable(), s.getPending())); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append("async-pool : not initialized\n"); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (pingConnManager != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolStats s = pingConnManager.getTotalStats(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append(String.format("ping-pool(P0): maxTotal=%-5d leased=%-5d available=%-5d pending=%d%n", | ||||||||||||||||||||||||||||||||||||||||||||||||
| pingConnManager.getMaxTotal(), s.getLeased(), s.getAvailable(), s.getPending())); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append("ping-pool(P0): not initialized\n"); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| sb.append("============= END: Connection Pool Status ========================\n"); | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug(sb.toString()); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| port = Platform.getManagementNodeServicePort(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| IptablesUtils.insertRuleToFilterTable(String.format("-A INPUT -p tcp -m state --state NEW -m tcp --dport %s -j ACCEPT", port)); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -211,15 +244,60 @@ void init() { | |||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug(String.format("RESTFacade built callback url: %s", callbackUrl)); | ||||||||||||||||||||||||||||||||||||||||||||||||
| template = RESTFacade.createRestTemplate(CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT); | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R5: capture connection managers for DumpConnectionPoolStatus observability | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolingNHttpClientConnectionManager[] cmRef = new PoolingNHttpClientConnectionManager[1]; | ||||||||||||||||||||||||||||||||||||||||||||||||
| asyncRestTemplate = createAsyncRestTemplate( | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT, | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_MAX_TOTAL, | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_MAX_PER_ROUTE, | ||||||||||||||||||||||||||||||||||||||||||||||||
| CoreGlobalProperty.REST_FACADE_MAX_TOTAL); | ||||||||||||||||||||||||||||||||||||||||||||||||
| cmRef); | ||||||||||||||||||||||||||||||||||||||||||||||||
| asyncConnManager = cmRef[0]; | ||||||||||||||||||||||||||||||||||||||||||||||||
| // P0 ping pool: maxPerRoute=2 (1 in-flight + 1 queued per host), maxTotal covers full cluster | ||||||||||||||||||||||||||||||||||||||||||||||||
| pingAsyncRestTemplate = createAsyncRestTemplate( | ||||||||||||||||||||||||||||||||||||||||||||||||
| 10000, // readTimeout = ping timeout (10s) | ||||||||||||||||||||||||||||||||||||||||||||||||
| 3000, // connectTimeout = fast fail (3s) | ||||||||||||||||||||||||||||||||||||||||||||||||
| PING_POOL_MAX_TOTAL, // maxTotal: full cluster capacity | ||||||||||||||||||||||||||||||||||||||||||||||||
| 2, // maxPerRoute: 2 allows 1 in-flight + 1 queued, avoids head-of-line block | ||||||||||||||||||||||||||||||||||||||||||||||||
| cmRef); | ||||||||||||||||||||||||||||||||||||||||||||||||
| pingConnManager = cmRef[0]; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // L1 built-in immune: proactive pool utilization alarm every 60s (R5) | ||||||||||||||||||||||||||||||||||||||||||||||||
| thdf.submitPeriodicTask(new PeriodicTask() { | ||||||||||||||||||||||||||||||||||||||||||||||||
| @Override public TimeUnit getTimeUnit() { return TimeUnit.SECONDS; } | ||||||||||||||||||||||||||||||||||||||||||||||||
| @Override public long getInterval() { return 60; } | ||||||||||||||||||||||||||||||||||||||||||||||||
| @Override public String getName() { return "http-pool-health-check"; } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||
| public void run() { | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (asyncConnManager != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolStats s = asyncConnManager.getTotalStats(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| long maxTotal = CoreGlobalProperty.REST_FACADE_MAX_TOTAL; | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (s.getPending() > 0 || s.getLeased() * 5 > maxTotal * 4) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warn(String.format("[POOL-ALARM] async-pool HIGH: leased=%d available=%d pending=%d maxTotal=%d", | ||||||||||||||||||||||||||||||||||||||||||||||||
| s.getLeased(), s.getAvailable(), s.getPending(), maxTotal)); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (pingConnManager != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolStats s = pingConnManager.getTotalStats(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (s.getPending() > 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warn(String.format("[POOL-ALARM] ping-pool(P0) CONGESTED: leased=%d available=%d pending=%d — P0 ping may be delayed", | ||||||||||||||||||||||||||||||||||||||||||||||||
| s.getLeased(), s.getAvailable(), s.getPending())); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // timeout are in milliseconds; delegates to 5-param overload (backward compat) | ||||||||||||||||||||||||||||||||||||||||||||||||
| // Parameter order matches createRestTemplate: (readTimeout, connectTimeout, maxTotal, maxPerRoute) | ||||||||||||||||||||||||||||||||||||||||||||||||
| private static AsyncRestTemplate createAsyncRestTemplate(int readTimeout, int connectTimeout, int maxTotal, int maxPerRoute) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| return createAsyncRestTemplate(readTimeout, connectTimeout, maxTotal, maxPerRoute, null); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // timeout are in milliseconds | ||||||||||||||||||||||||||||||||||||||||||||||||
| private static AsyncRestTemplate createAsyncRestTemplate(int readTimeout, int connectTimeout, int maxPerRoute, int maxTotal) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R5: outCm captures the connection manager for observability (DumpConnectionPoolStatus) | ||||||||||||||||||||||||||||||||||||||||||||||||
| private static AsyncRestTemplate createAsyncRestTemplate(int readTimeout, int connectTimeout, int maxTotal, int maxPerRoute, | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolingNHttpClientConnectionManager[] outCm) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| PoolingNHttpClientConnectionManager connectionManager; | ||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||
| connectionManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(IOReactorConfig.DEFAULT)); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -229,6 +307,7 @@ private static AsyncRestTemplate createAsyncRestTemplate(int readTimeout, int co | |||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| connectionManager.setDefaultMaxPerRoute(maxPerRoute); | ||||||||||||||||||||||||||||||||||||||||||||||||
| connectionManager.setMaxTotal(maxTotal); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (outCm != null) outCm[0] = connectionManager; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom() | ||||||||||||||||||||||||||||||||||||||||||||||||
| .setConnectionManager(connectionManager) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -237,7 +316,8 @@ private static AsyncRestTemplate createAsyncRestTemplate(int readTimeout, int co | |||||||||||||||||||||||||||||||||||||||||||||||
| HttpComponentsAsyncClientHttpRequestFactory cf = new HttpComponentsAsyncClientHttpRequestFactory(httpAsyncClient); | ||||||||||||||||||||||||||||||||||||||||||||||||
| cf.setConnectTimeout(connectTimeout); | ||||||||||||||||||||||||||||||||||||||||||||||||
| cf.setReadTimeout(readTimeout); | ||||||||||||||||||||||||||||||||||||||||||||||||
| cf.setConnectionRequestTimeout(connectTimeout * 2); | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R4: connectionRequestTimeout must be < operation timeout (e.g. ping timeout=10s) | ||||||||||||||||||||||||||||||||||||||||||||||||
| cf.setConnectionRequestTimeout(Math.min(connectTimeout * 2, CoreGlobalProperty.REST_FACADE_CONNECTION_REQUEST_TIMEOUT)); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(cf); | ||||||||||||||||||||||||||||||||||||||||||||||||
| RESTFacade.setMessageConverter(asyncRestTemplate.getMessageConverters()); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -574,7 +654,9 @@ public void success(HttpEntity<String> responseEntity) { | |||||||||||||||||||||||||||||||||||||||||||||||
| logger.trace(String.format("json %s [%s], %s", method.toString(), url, req)); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| ListenableFuture<ResponseEntity<String>> f = asyncRestTemplate.exchange(url, method, req, String.class); | ||||||||||||||||||||||||||||||||||||||||||||||||
| AsyncRestTemplate override = ASYNC_TEMPLATE_OVERRIDE.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| AsyncRestTemplate tmpl = override != null ? override : asyncRestTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||
| ListenableFuture<ResponseEntity<String>> f = tmpl.exchange(url, method, req, String.class); | ||||||||||||||||||||||||||||||||||||||||||||||||
| f.addCallback(rsp -> {}, e -> wrapper.fail(err(ORG_ZSTACK_CORE_REST_10003, SysErrors.HTTP_ERROR, e.getLocalizedMessage()))); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (RestClientException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warn(String.format("Unable to %s to %s: %s", method.toString(), url, e.getMessage())); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -600,6 +682,28 @@ public void asyncJsonPost(String url, String body, AsyncRESTCallback callback) { | |||||||||||||||||||||||||||||||||||||||||||||||
| asyncJsonPost(url, body, callback, TimeUnit.MILLISECONDS, timeout); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||
| public void asyncJsonPostForPing(String url, Object body, AsyncRESTCallback callback) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R2: inject dedicated ping pool via ThreadLocal; cleared in finally to prevent leaks | ||||||||||||||||||||||||||||||||||||||||||||||||
| ASYNC_TEMPLATE_OVERRIDE.set(pingAsyncRestTemplate); | ||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||
| asyncJsonPost(url, body, callback); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||
| ASYNC_TEMPLATE_OVERRIDE.remove(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+685
to
+693
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ThreadLocal 覆盖值在嵌套调用下会被误清理。 Line 622 直接 建议修复 `@Override`
public void asyncJsonPostForPing(String url, Object body, AsyncRESTCallback callback) {
- ASYNC_TEMPLATE_OVERRIDE.set(pingAsyncRestTemplate);
+ AsyncRestTemplate previous = ASYNC_TEMPLATE_OVERRIDE.get();
+ ASYNC_TEMPLATE_OVERRIDE.set(pingAsyncRestTemplate);
try {
asyncJsonPost(url, body, callback);
} finally {
- ASYNC_TEMPLATE_OVERRIDE.remove();
+ if (previous == null) {
+ ASYNC_TEMPLATE_OVERRIDE.remove();
+ } else {
+ ASYNC_TEMPLATE_OVERRIDE.set(previous);
+ }
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||
| public void asyncJsonPostForPing(String url, Object body, AsyncRESTCallback callback, TimeUnit unit, long timeout) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| // R2: inject dedicated ping pool via ThreadLocal; cleared in finally to prevent leaks | ||||||||||||||||||||||||||||||||||||||||||||||||
| ASYNC_TEMPLATE_OVERRIDE.set(pingAsyncRestTemplate); | ||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||
| asyncJsonPost(url, body, callback, unit, timeout); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||
| ASYNC_TEMPLATE_OVERRIDE.remove(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||
| public HttpEntity<String> httpServletRequestToHttpEntity(HttpServletRequest req) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.