From c200bae90ed79e49cf6cd303ab75350775398811 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Fri, 6 Feb 2026 13:29:15 +0800 Subject: [PATCH 1/4] Fix SessionPool client leak on reconnection and query failures, and preserve server error messages - Add SessionPoolDepletedException with diagnostic properties for pool depletion scenarios - Add ReconnectionFailedException for type-safe reconnection failure detection - Fix client leak when reconnection succeeds but retry operation fails - Add PoolHealthMetrics for thread-safe health monitoring - Add try-finally protection for Monitor locks in ConcurrentClientQueue - Remove silent failure pattern - always log connection failures - Add CurrentBatchRowCount() method with Obsolete attribute on RowCount() - Improve database switch error handling with partial failure detection --- docs/SessionPool_Exception_Handling.md | 487 ++++++++++++++++++ src/Apache.IoTDB.Data/IoTDBDataReader.cs | 4 +- src/Apache.IoTDB/ConcurrentClientQueue.cs | 62 +-- .../DataStructure/SessionDataSet.cs | 13 + src/Apache.IoTDB/PoolHealthMetrics.cs | 52 ++ .../ReconnectionFailedException.cs | 41 ++ src/Apache.IoTDB/SessionPool.cs | 110 ++-- .../SessionPoolDepletedException.cs | 74 +++ 8 files changed, 771 insertions(+), 72 deletions(-) create mode 100644 docs/SessionPool_Exception_Handling.md create mode 100644 src/Apache.IoTDB/PoolHealthMetrics.cs create mode 100644 src/Apache.IoTDB/ReconnectionFailedException.cs create mode 100644 src/Apache.IoTDB/SessionPoolDepletedException.cs diff --git a/docs/SessionPool_Exception_Handling.md b/docs/SessionPool_Exception_Handling.md new file mode 100644 index 0000000..75fd019 --- /dev/null +++ b/docs/SessionPool_Exception_Handling.md @@ -0,0 +1,487 @@ +# SessionPool Exception Handling and Health Monitoring + +## Overview + +The Apache IoTDB C# client library provides comprehensive exception handling and health monitoring capabilities for SessionPool operations. This document explains how to handle pool depletion scenarios, monitor pool health, and implement recovery strategies. + +## SessionPoolDepletedException + +### Description + +`SessionPoolDepletedException` is a specialized exception thrown when the SessionPool cannot provide a client connection. This indicates that: + +- All clients in the pool are currently in use, OR +- Client connections have failed and reconnection attempts were unsuccessful, OR +- The pool wait timeout has been exceeded + +### Exception Properties + +The exception provides detailed diagnostic information through the following properties: + +| Property | Type | Description | +| --------------------- | ------ | -------------------------------------------------------------------------- | +| `DepletionReason` | string | A human-readable description of why the pool was depleted | +| `AvailableClients` | int | Number of currently available clients in the pool at the time of exception | +| `TotalPoolSize` | int | The total configured size of the session pool | +| `FailedReconnections` | int | Number of failed reconnection attempts since the pool was opened | + +### Example Usage + +```csharp +using Apache.IoTDB; +using System; + +try +{ + var sessionPool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .PoolSize(4) + .Build(); + + await sessionPool.Open(); + + // Perform operations... + await sessionPool.InsertRecordAsync("root.sg.d1", record); +} +catch (SessionPoolDepletedException ex) +{ + Console.WriteLine($"Pool depleted: {ex.DepletionReason}"); + Console.WriteLine($"Available clients: {ex.AvailableClients}/{ex.TotalPoolSize}"); + Console.WriteLine($"Failed reconnections: {ex.FailedReconnections}"); + + // Implement recovery strategy (see below) +} +``` + +## Pool Health Metrics + +### Monitoring Pool Status + +The `SessionPool` class exposes real-time health metrics that can be used for monitoring and alerting: + +```csharp +var sessionPool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .PoolSize(8) + .Build(); + +await sessionPool.Open(); + +// Check pool health +Console.WriteLine($"Available Clients: {sessionPool.AvailableClients}"); +Console.WriteLine($"Total Pool Size: {sessionPool.TotalPoolSize}"); +Console.WriteLine($"Failed Reconnections: {sessionPool.FailedReconnections}"); +``` + +### Health Metrics + +| Metric | Property | Description | Recommended Threshold | +| -------------------- | --------------------- | ------------------------------------------------ | --------------------------- | +| Available Clients | `AvailableClients` | Number of idle clients ready for use | Alert if < 25% of pool size | +| Total Pool Size | `TotalPoolSize` | Configured maximum pool size | N/A (constant) | +| Failed Reconnections | `FailedReconnections` | Cumulative count of failed reconnection attempts | Alert if > 0 and increasing | + +## Failure Scenarios and Recovery Strategies + +### Scenario 1: Pool Exhaustion (High Load) + +**Symptoms:** + +- `SessionPoolDepletedException` with reason "Connection pool is empty and wait time out" +- `AvailableClients` = 0 +- `FailedReconnections` = 0 or low + +**Root Cause:** Application workload exceeds pool capacity + +**Recovery Strategies:** + +1. **Increase Pool Size:** + +```csharp +var sessionPool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .PoolSize(16) // Increased from 8 + .Build(); +``` + +2. **Implement Connection Retry with Backoff:** + +```csharp +int maxRetries = 3; +int retryDelayMs = 1000; + +for (int i = 0; i < maxRetries; i++) +{ + try + { + await sessionPool.InsertRecordAsync(deviceId, record); + break; // Success + } + catch (SessionPoolDepletedException ex) when (i < maxRetries - 1) + { + await Task.Delay(retryDelayMs * (i + 1)); // Exponential backoff + } +} +``` + +3. **Optimize Operation Duration:** + - Reduce the time each client is held + - Batch multiple operations together + - Use async operations efficiently + +### Scenario 2: Network Connectivity Issues + +**Symptoms:** + +- `SessionPoolDepletedException` with reason "Reconnection failed" +- `AvailableClients` decreases over time +- `FailedReconnections` > 0 and increasing + +**Root Cause:** IoTDB server unreachable or network issues + +**Recovery Strategies:** + +1. **Reinitialize SessionPool:** + +```csharp +catch (SessionPoolDepletedException ex) when (ex.FailedReconnections > 5) +{ + Console.WriteLine($"Critical: {ex.FailedReconnections} failed reconnections"); + + // Close existing pool + await sessionPool.Close(); + + // Wait for network recovery + await Task.Delay(5000); + + // Create new pool + sessionPool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .PoolSize(8) + .Build(); + + await sessionPool.Open(); +} +``` + +2. **Implement Circuit Breaker Pattern:** + +```csharp +public class SessionPoolCircuitBreaker +{ + private SessionPool _pool; + private int _failureCount = 0; + private const int FailureThreshold = 5; + private bool _circuitOpen = false; + private DateTime _lastFailureTime; + + public async Task ExecuteAsync(Func> operation) + { + if (_circuitOpen && DateTime.Now - _lastFailureTime < TimeSpan.FromMinutes(1)) + { + throw new Exception("Circuit breaker is open"); + } + + try + { + var result = await operation(_pool); + _failureCount = 0; // Reset on success + _circuitOpen = false; + return result; + } + catch (SessionPoolDepletedException ex) + { + _failureCount++; + _lastFailureTime = DateTime.Now; + + if (_failureCount >= FailureThreshold) + { + _circuitOpen = true; + Console.WriteLine("Circuit breaker opened - too many failures"); + } + throw; + } + } +} +``` + +### Scenario 3: Server Overload + +**Symptoms:** + +- Intermittent `SessionPoolDepletedException` +- Both connection timeouts and reconnection failures + +**Root Cause:** IoTDB server is overloaded + +**Recovery Strategies:** + +1. **Implement Rate Limiting:** + +```csharp +using System.Threading; + +private SemaphoreSlim _rateLimiter = new SemaphoreSlim(10, 10); // Max 10 concurrent operations + +public async Task RateLimitedInsert(string deviceId, RowRecord record) +{ + await _rateLimiter.WaitAsync(); + try + { + await sessionPool.InsertRecordAsync(deviceId, record); + } + finally + { + _rateLimiter.Release(); + } +} +``` + +2. **Add Timeout Configuration:** + +```csharp +var sessionPool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .Timeout(120) // Increased timeout for slow server + .Build(); +``` + +## Monitoring and Alerting Recommendations + +### Health Check Implementation + +```csharp +public class SessionPoolHealthCheck +{ + private readonly SessionPool _pool; + + public SessionPoolHealthCheck(SessionPool pool) + { + _pool = pool; + } + + public HealthStatus CheckHealth() + { + var availableRatio = (double)_pool.AvailableClients / _pool.TotalPoolSize; + + if (_pool.FailedReconnections > 10) + { + return new HealthStatus + { + Status = "Critical", + Message = $"High reconnection failures: {_pool.FailedReconnections}", + Recommendation = "Check IoTDB server availability" + }; + } + + if (availableRatio < 0.25) + { + return new HealthStatus + { + Status = "Warning", + Message = $"Low available clients: {_pool.AvailableClients}/{_pool.TotalPoolSize}", + Recommendation = "Consider increasing pool size" + }; + } + + return new HealthStatus + { + Status = "Healthy", + Message = $"Pool healthy: {_pool.AvailableClients}/{_pool.TotalPoolSize} available" + }; + } +} + +public class HealthStatus +{ + public string Status { get; set; } + public string Message { get; set; } + public string Recommendation { get; set; } +} +``` + +### Metrics Collection for Monitoring Systems + +```csharp +// Example: Export metrics to Prometheus, StatsD, or similar +public class SessionPoolMetricsCollector +{ + private readonly SessionPool _pool; + + public void CollectMetrics() + { + // Gauge: Current available clients + MetricsCollector.Set("iotdb_pool_available_clients", _pool.AvailableClients); + + // Gauge: Total pool size + MetricsCollector.Set("iotdb_pool_total_size", _pool.TotalPoolSize); + + // Counter: Failed reconnections + MetricsCollector.Set("iotdb_pool_failed_reconnections", _pool.FailedReconnections); + + // Calculated: Pool utilization percentage + var utilization = (1.0 - (double)_pool.AvailableClients / _pool.TotalPoolSize) * 100; + MetricsCollector.Set("iotdb_pool_utilization_percent", utilization); + } +} +``` + +### Recommended Alert Rules + +1. **Critical Alerts:** + - `FailedReconnections > 10`: Server connectivity issues + - `AvailableClients == 0` for > 30 seconds: Complete pool exhaustion + +2. **Warning Alerts:** + - `AvailableClients < TotalPoolSize * 0.25`: Pool under pressure + - `FailedReconnections > 0` and increasing: Network instability + +3. **Info Alerts:** + - Pool utilization > 75% for extended periods: Consider scaling + +## Best Practices + +1. **Pool Sizing:** + - Start with poolSize = 2 × expected concurrent operations + - Monitor and adjust based on actual usage patterns + - Larger pools use more server resources but provide better throughput + +2. **Error Handling:** + - Always catch `SessionPoolDepletedException` specifically + - Log exception properties for debugging + - Implement appropriate retry logic based on depletion reason + +3. **Monitoring:** + - Continuously monitor `AvailableClients` metric + - Track `FailedReconnections` as a leading indicator of problems + - Set up alerts before pool is completely depleted + +4. **Resource Management:** + - Always call `sessionPool.Close()` when done + - Use `using` statements or try-finally blocks for proper cleanup + - Don't create multiple SessionPool instances unnecessarily + +## Example: Complete Production-Ready Implementation + +```csharp +using Apache.IoTDB; +using System; +using System.Threading.Tasks; + +public class ProductionSessionPoolManager +{ + private SessionPool _pool; + private readonly object _lock = new object(); + + public async Task Initialize() + { + _pool = new SessionPool.Builder() + .Host("127.0.0.1") + .Port(6667) + .PoolSize(8) + .Timeout(60) + .Build(); + + await _pool.Open(); + + // Start health monitoring + _ = Task.Run(MonitorHealth); + } + + public async Task ExecuteWithRetry(Func> operation) + { + const int maxRetries = 3; + const int baseDelayMs = 1000; + + for (int attempt = 0; attempt < maxRetries; attempt++) + { + try + { + return await operation(_pool); + } + catch (SessionPoolDepletedException ex) + { + Console.WriteLine($"Attempt {attempt + 1} failed: {ex.Message}"); + Console.WriteLine($"Pool state - Available: {ex.AvailableClients}/{ex.TotalPoolSize}, Failed reconnections: {ex.FailedReconnections}"); + + if (attempt == maxRetries - 1) + { + // Last attempt failed + if (ex.FailedReconnections > 5) + { + // Reinitialize pool + await ReinitializePool(); + } + throw; + } + + // Exponential backoff + await Task.Delay(baseDelayMs * (int)Math.Pow(2, attempt)); + } + } + + throw new InvalidOperationException("Should not reach here"); + } + + private async Task ReinitializePool() + { + lock (_lock) + { + try + { + _pool?.Close().Wait(); + } + catch { } + } + + await Task.Delay(5000); // Wait for server recovery + await Initialize(); + } + + private async Task MonitorHealth() + { + while (true) + { + await Task.Delay(10000); // Check every 10 seconds + + try + { + var availableRatio = (double)_pool.AvailableClients / _pool.TotalPoolSize; + + if (_pool.FailedReconnections > 10) + { + Console.WriteLine($"CRITICAL: {_pool.FailedReconnections} failed reconnections"); + } + else if (availableRatio < 0.25) + { + Console.WriteLine($"WARNING: Low available clients - {_pool.AvailableClients}/{_pool.TotalPoolSize}"); + } + } + catch (Exception ex) + { + Console.WriteLine($"Health check failed: {ex.Message}"); + } + } + } + + public async Task Cleanup() + { + await _pool?.Close(); + } +} +``` + +## Summary + +The SessionPool exception handling and health monitoring features provide comprehensive tools for building robust IoTDB applications: + +- Use `SessionPoolDepletedException` to understand and react to pool issues +- Monitor `AvailableClients`, `TotalPoolSize`, and `FailedReconnections` metrics +- Implement appropriate recovery strategies based on failure scenarios +- Set up proactive monitoring and alerting to prevent issues +- Follow best practices for pool sizing and resource management diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs b/src/Apache.IoTDB.Data/IoTDBDataReader.cs index 8f96841..fe946b9 100644 --- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs +++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs @@ -56,8 +56,8 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool _command = IoTDBCommand; _closeConnection = closeConnection; _fieldCount = dataSet.GetColumnNames().Count; - _hasRows = dataSet.RowCount() > 0; - _recordsAffected = dataSet.RowCount(); + _hasRows = dataSet.CurrentBatchRowCount() > 0; + _recordsAffected = -1; // Total row count is unknown; use -1 per ADO.NET convention _closed = _closeConnection; _metas = dataSet.GetColumnNames(); diff --git a/src/Apache.IoTDB/ConcurrentClientQueue.cs b/src/Apache.IoTDB/ConcurrentClientQueue.cs index e53d3f3..17ee61e 100644 --- a/src/Apache.IoTDB/ConcurrentClientQueue.cs +++ b/src/Apache.IoTDB/ConcurrentClientQueue.cs @@ -28,6 +28,7 @@ namespace Apache.IoTDB public class ConcurrentClientQueue { public ConcurrentQueue ClientQueue { get; } + internal IPoolDiagnosticReporter DiagnosticReporter { get; set; } public ConcurrentClientQueue(List clients) { @@ -47,50 +48,51 @@ public void Return(Client client) Monitor.Exit(ClientQueue); Thread.Sleep(0); } - int _ref = 0; - public void AddRef() - { - lock (this) - { - _ref++; - } - } - public int GetRef() - { - return _ref; - } - public void RemoveRef() - { - lock (this) - { - _ref--; - } - } + private int _ref = 0; + public void AddRef() => Interlocked.Increment(ref _ref); + public int GetRef() => Volatile.Read(ref _ref); + public void RemoveRef() => Interlocked.Decrement(ref _ref); public int Timeout { get; set; } = 10; public Client Take() { Client client = null; Monitor.Enter(ClientQueue); - while (true) + try { - bool timeout = false; - if (ClientQueue.IsEmpty) + while (true) { - timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout)); - } - ClientQueue.TryDequeue(out client); + bool timeout = false; + if (ClientQueue.IsEmpty) + { + timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout)); + } + ClientQueue.TryDequeue(out client); - if (client != null || timeout) - { - break; + if (client != null || timeout) + { + break; + } } } - Monitor.Exit(ClientQueue); + finally + { + Monitor.Exit(ClientQueue); + } if (client == null) { - throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!"); + var reasonPhrase = $"Connection pool is empty and wait time out({Timeout}s)"; + if (DiagnosticReporter != null) + { + throw DiagnosticReporter.BuildDepletionException(reasonPhrase); + } + throw new TimeoutException(reasonPhrase); } return client; } } + + internal interface IPoolDiagnosticReporter + { + SessionPoolDepletedException BuildDepletionException(string reasonPhrase); + } } diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index 3d08843..bc7cde4 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -108,7 +108,20 @@ public SessionDataSet( public IReadOnlyList GetColumnNames() => _rpcDataSet._columnNameList; public IReadOnlyList GetColumnTypes() => _rpcDataSet._columnTypeList; + /// + /// Gets the number of rows in the current fetched batch (tsBlock). + /// Note: This is NOT the total row count of the query result. Use HasNext() to check for more data. + /// + /// The number of rows in the current batch. + public int CurrentBatchRowCount() => _rpcDataSet._tsBlockSize; + + /// + /// Gets the number of rows in the current fetched batch. + /// + /// The number of rows in the current batch. + [Obsolete("Use CurrentBatchRowCount() instead. This method returns batch size, not total row count.")] public int RowCount() => _rpcDataSet._tsBlockSize; + public void ShowTableNames() { IReadOnlyList columns = GetColumnNames(); diff --git a/src/Apache.IoTDB/PoolHealthMetrics.cs b/src/Apache.IoTDB/PoolHealthMetrics.cs new file mode 100644 index 0000000..938766c --- /dev/null +++ b/src/Apache.IoTDB/PoolHealthMetrics.cs @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; + +namespace Apache.IoTDB +{ + /// + /// Encapsulates real-time health statistics for connection pool monitoring. + /// Thread-safe implementation for concurrent access patterns. + /// + internal class PoolHealthMetrics + { + private int _reconnectionFailureTally; + private readonly int _configuredMaxSize; + + public PoolHealthMetrics(int configuredMaxSize) + { + _configuredMaxSize = configuredMaxSize; + } + + public void IncrementReconnectionFailures() + { + Interlocked.Increment(ref _reconnectionFailureTally); + } + + public void ResetAllCounters() + { + Interlocked.Exchange(ref _reconnectionFailureTally, 0); + } + + public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally); + + public int GetConfiguredMaxSize() => _configuredMaxSize; + } +} diff --git a/src/Apache.IoTDB/ReconnectionFailedException.cs b/src/Apache.IoTDB/ReconnectionFailedException.cs new file mode 100644 index 0000000..db80d6b --- /dev/null +++ b/src/Apache.IoTDB/ReconnectionFailedException.cs @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Thrift; + +namespace Apache.IoTDB +{ + /// + /// Exception thrown when all reconnection attempts to the server have failed. + /// This exception is used internally to distinguish reconnection failures from other errors. + /// + internal class ReconnectionFailedException : TException + { + internal ReconnectionFailedException(string message) + : base(message, null) + { + } + + internal ReconnectionFailedException(string message, Exception innerException) + : base(message, innerException) + { + } + } +} diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index fc2eca0..5c241e4 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -34,9 +34,10 @@ namespace Apache.IoTDB { - public partial class SessionPool : IDisposable + public partial class SessionPool : IDisposable, IPoolDiagnosticReporter { private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; + private const string DepletionReasonReconnectFailed = "Reconnection failed"; private readonly string _username; private readonly string _password; @@ -62,7 +63,24 @@ public partial class SessionPool : IDisposable private bool _isClose = true; private ConcurrentClientQueue _clients; private ILogger _logger; + private PoolHealthMetrics _healthMetrics; + public delegate Task AsyncOperation(Client client); + + /// + /// Retrieves current count of idle clients ready for operations. + /// + public int AvailableClients => _clients?.ClientQueue.Count ?? 0; + + /// + /// Retrieves the configured maximum capacity of the session pool. + /// + public int TotalPoolSize => _healthMetrics?.GetConfiguredMaxSize() ?? _poolSize; + + /// + /// Retrieves cumulative tally of reconnection failures since pool was opened. + /// + public int FailedReconnections => _healthMetrics?.GetReconnectionFailureTally() ?? 0; [Obsolete("This method is deprecated, please use new SessionPool.Builder().")] @@ -157,52 +175,62 @@ protected internal SessionPool(List nodeUrls, string username, string pa public async Task ExecuteClientOperationAsync(AsyncOperation operation, string errMsg, bool retryOnFailure = true, bool putClientBack = true) { Client client = _clients.Take(); + bool shouldReturnClient = true; + bool operationSucceeded = false; try { var resp = await operation(client); + operationSucceeded = true; return resp; } - catch (TException ex) + catch (Exception ex) { if (retryOnFailure) { + // Try to reconnect try { client = await Reconnect(client); - return await operation(client); + // Reconnect succeeded, client is now a new healthy connection } - catch (TException retryEx) + catch (ReconnectionFailedException reconnectEx) { - throw new TException(errMsg, retryEx); + // Reconnection failed - original client was closed by Reconnect + shouldReturnClient = false; + throw new SessionPoolDepletedException(DepletionReasonReconnectFailed, AvailableClients, TotalPoolSize, FailedReconnections, reconnectEx); } - } - else - { - throw new TException(errMsg, ex); - } - } - catch (Exception ex) - { - if (retryOnFailure) - { + + // Reconnect succeeded, try the operation again try { - client = await Reconnect(client); - return await operation(client); + var resp = await operation(client); + operationSucceeded = true; + return resp; } - catch (TException retryEx) + catch (Exception retryEx) { - throw new TException(errMsg, retryEx); + // Retry operation failed, but client is healthy and should be returned to pool + // shouldReturnClient remains true + string detailedMsg = $"{errMsg}. {retryEx.Message}"; + throw new TException(detailedMsg, retryEx); } } else { - throw new TException(errMsg, ex); + // Preserve original error message from server + string detailedMsg = $"{errMsg}. {ex.Message}"; + throw new TException(detailedMsg, ex); } } finally { - if (putClientBack) + // Return client to pool if: + // 1. putClientBack is true (normal operations - client should always be returned), OR + // 2. putClientBack is false (query operations) BUT operation failed, meaning SessionDataSet + // wasn't created and won't manage the client + // Do NOT return if reconnection failed (shouldReturnClient is false) because client was closed by Reconnect + bool shouldReturnForQueryFailure = !putClientBack && !operationSucceeded; + if (shouldReturnClient && (putClientBack || shouldReturnForQueryFailure)) { _clients.Add(client); } @@ -237,8 +265,10 @@ public async Task Open(bool enableRpcCompression, CancellationToken cancellation public async Task Open(CancellationToken cancellationToken = default) { + _healthMetrics = new PoolHealthMetrics(_poolSize); _clients = new ConcurrentClientQueue(); _clients.Timeout = _timeout * 5; + _clients.DiagnosticReporter = this; if (_nodeUrls.Count == 0) { @@ -250,10 +280,7 @@ public async Task Open(CancellationToken cancellationToken = default) } catch (Exception e) { - if (_debugMode) - { - _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", _host, _port); - } + _logger?.LogWarning(e, "Failed to create connection {0}/{1} to {2}:{3}", index + 1, _poolSize, _host, _port); } } } @@ -277,10 +304,7 @@ public async Task Open(CancellationToken cancellationToken = default) } catch (Exception e) { - if (_debugMode) - { - _logger.LogWarning(e, "Currently connecting to {0}:{1} failed", endPoint.Ip, endPoint.Port); - } + _logger?.LogWarning(e, "Failed to create connection to {0}:{1}", endPoint.Ip, endPoint.Port); } } if (!isConnected) // current client could not connect to any endpoint @@ -313,10 +337,7 @@ public async Task Reconnect(Client originalClient = null, CancellationTo } catch (Exception e) { - if (_debugMode) - { - _logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port); - } + _logger?.LogWarning(e, "Reconnection attempt {0}/{1} to {2}:{3} failed", attempt, RetryNum, _host, _port); } } } @@ -340,16 +361,14 @@ public async Task Reconnect(Client originalClient = null, CancellationTo } catch (Exception e) { - if (_debugMode) - { - _logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port); - } + _logger?.LogWarning(e, "Reconnection attempt {0}/{1} to {2}:{3} failed", attempt, RetryNum, _endPoints[j].Ip, _endPoints[j].Port); } } } } - throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null); + _healthMetrics?.IncrementReconnectionFailures(); + throw new ReconnectionFailedException("Error occurs when reconnecting session pool. Could not connect to any server"); } public bool IsOpen() => !_isClose; @@ -1408,6 +1427,7 @@ public async Task ExecuteNonQueryStatementAsync(string sql) if (_database != previousDB) { // all client should switch to the same database + var failedClients = new List<(long SessionId, Exception Error)>(); foreach (var c in _clients.ClientQueue) { try @@ -1420,10 +1440,17 @@ public async Task ExecuteNonQueryStatementAsync(string sql) } catch (Exception e) { - _logger.LogError("switch database from {0} to {1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message); + failedClients.Add((c.SessionId, e)); + _logger?.LogError("switch database from {0} to {1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message); } } - _logger.LogInformation("switch database from {0} to {1}", previousDB, _database); + + if (failedClients.Count > 0) + { + throw new TException($"Database switch partially failed: {failedClients.Count} client(s) could not switch from {previousDB} to {_database}", failedClients[0].Error); + } + + _logger?.LogInformation("switch database from {0} to {1}", previousDB, _database); } if (_debugMode) @@ -1802,5 +1829,8 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } + + SessionPoolDepletedException IPoolDiagnosticReporter.BuildDepletionException(string reasonPhrase) + => new SessionPoolDepletedException(reasonPhrase, AvailableClients, TotalPoolSize, FailedReconnections); } } diff --git a/src/Apache.IoTDB/SessionPoolDepletedException.cs b/src/Apache.IoTDB/SessionPoolDepletedException.cs new file mode 100644 index 0000000..e489ba0 --- /dev/null +++ b/src/Apache.IoTDB/SessionPoolDepletedException.cs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Thrift; + +namespace Apache.IoTDB +{ + /// + /// Specialized exception raised when SessionPool cannot allocate a client connection. + /// Includes diagnostic data for troubleshooting capacity and connectivity problems. + /// + public class SessionPoolDepletedException : TException + { + /// + /// Descriptive explanation of what caused the pool depletion event. + /// + public string DepletionReason { get; } + + /// + /// Number of clients available for use at the moment of exception. + /// + public int AvailableClients { get; } + + /// + /// Maximum configured pool size limit. + /// + public int TotalPoolSize { get; } + + /// + /// Accumulated number of unsuccessful reconnection attempts. + /// + public int FailedReconnections { get; } + + internal SessionPoolDepletedException( + string depletionReason, + int availableClients, + int totalPoolSize, + int failedReconnections) + : this(depletionReason, availableClients, totalPoolSize, failedReconnections, null) + { + } + + internal SessionPoolDepletedException( + string depletionReason, + int availableClients, + int totalPoolSize, + int failedReconnections, + Exception causedBy) + : base($"SessionPool depletion detected: {depletionReason}", causedBy) + { + DepletionReason = depletionReason; + AvailableClients = availableClients; + TotalPoolSize = totalPoolSize; + FailedReconnections = failedReconnections; + } + } +} From 3db3d3888ce7e4f45307db500875277ece3c7234 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Fri, 6 Feb 2026 14:06:19 +0800 Subject: [PATCH 2/4] Update src/Apache.IoTDB/SessionPool.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Apache.IoTDB/SessionPool.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 5c241e4..60c8645 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -1428,7 +1428,7 @@ public async Task ExecuteNonQueryStatementAsync(string sql) { // all client should switch to the same database var failedClients = new List<(long SessionId, Exception Error)>(); - foreach (var c in _clients.ClientQueue) + foreach (var c in _clients.ClientQueue.AsEnumerable()) { try { From 7b86fc7241dcbf06a11c2e9b93960496084ac035 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Fri, 6 Feb 2026 14:16:33 +0800 Subject: [PATCH 3/4] use try-finally for Return Client --- src/Apache.IoTDB/ConcurrentClientQueue.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Apache.IoTDB/ConcurrentClientQueue.cs b/src/Apache.IoTDB/ConcurrentClientQueue.cs index 17ee61e..6090c19 100644 --- a/src/Apache.IoTDB/ConcurrentClientQueue.cs +++ b/src/Apache.IoTDB/ConcurrentClientQueue.cs @@ -43,9 +43,15 @@ public ConcurrentClientQueue() public void Return(Client client) { Monitor.Enter(ClientQueue); - ClientQueue.Enqueue(client); - Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time - Monitor.Exit(ClientQueue); + try + { + ClientQueue.Enqueue(client); + Monitor.PulseAll(ClientQueue); // wake up all threads waiting on the queue, refresh the waiting time + } + finally + { + Monitor.Exit(ClientQueue); + } Thread.Sleep(0); } private int _ref = 0; From 27441b113518740af62d463e18804ce8b7485134 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Fri, 6 Feb 2026 15:20:26 +0800 Subject: [PATCH 4/4] Feature/dotnet format ci (#46) * switch to dotnet format * format --- .editorconfig | 4 ++++ .github/workflows/pre-commit-format.yml | 30 +++++-------------------- README.md | 18 +++++++++++++++ README_ZH.md | 18 +++++++++++++++ src/Apache.IoTDB/PoolHealthMetrics.cs | 2 +- src/Apache.IoTDB/Rpc/TSStatusCode.cs | 2 +- src/Apache.IoTDB/SessionPool.cs | 10 ++++----- 7 files changed, 52 insertions(+), 32 deletions(-) diff --git a/.editorconfig b/.editorconfig index 7f6de4d..3c8687f 100644 --- a/.editorconfig +++ b/.editorconfig @@ -37,6 +37,10 @@ indent_size = 2 [*{_AssemblyInfo.cs,.notsupported.cs,*/obj/*/External/**/*,*/obj/dotnet-new.IntegrationTests/*/TemplatePackagesPaths.cs}] generated_code = true +# Thrift generated code +[src/Apache.IoTDB/Rpc/Generated/**] +generated_code = true + # C# files [*.cs] # New line preferences diff --git a/.github/workflows/pre-commit-format.yml b/.github/workflows/pre-commit-format.yml index 3b7589c..7ef53be 100644 --- a/.github/workflows/pre-commit-format.yml +++ b/.github/workflows/pre-commit-format.yml @@ -7,15 +7,13 @@ on: - "**" merge_group: branches: [main] - # schedule: - # - cron: "0 0 * * *" concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true jobs: - formatting-checks: + dotnet-format: runs-on: ubuntu-22.04 env: DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1 @@ -29,27 +27,9 @@ jobs: with: dotnet-version: "9.0.x" - - name: Setup Python environment (for pre-commit) - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - - name: Clean dotnet temporary folder - run: | - sudo rm -rf /tmp/.dotnet - mkdir -p ${{ runner.temp }}/dotnet-home - mkdir -p ${{ runner.temp }}/xdg-runtime - - - name: Install pre-commit and dependencies - run: | - pip install pre-commit - pre-commit install-hooks + - name: Restore dependencies + run: dotnet restore - - name: Run pre-commit checks - env: - TMPDIR: ${{ runner.temp }} - DOTNET_CLI_HOME: ${{ runner.temp }}/dotnet-home - XDG_RUNTIME_DIR: ${{ runner.temp }}/xdg-runtime - NUGET_PACKAGES: ${{ runner. temp }}/nuget-packages + - name: Check formatting run: | - pre-commit run --all-files + dotnet format --verify-no-changes --verbosity diagnostic diff --git a/README.md b/README.md index 7f46166..f395763 100644 --- a/README.md +++ b/README.md @@ -82,5 +82,23 @@ NLog >= 4.7.9 * dotnet CLI * Thrift +## Code Formatting + +This project uses `dotnet format` to enforce consistent code style based on the [.editorconfig](.editorconfig) rules. + +### Check formatting locally + +```bash +dotnet format --verify-no-changes +``` + +### Auto-fix formatting issues + +```bash +dotnet format +``` + +The CI pipeline will automatically check code formatting on all pull requests. Please ensure your code is properly formatted before submitting a PR. + ## Publish your own client on nuget.org You can find out how to publish from this [doc](./PUBLISH.md). \ No newline at end of file diff --git a/README_ZH.md b/README_ZH.md index 440477c..df2f500 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -81,5 +81,23 @@ NLog >= 4.7.9 * dotnet CLI * Thrift +## 代码格式化 + +本项目使用 `dotnet format` 基于 [.editorconfig](.editorconfig) 规则来强制执行一致的代码风格。 + +### 本地检查格式 + +```bash +dotnet format --verify-no-changes +``` + +### 自动修复格式问题 + +```bash +dotnet format +``` + +CI 流水线会在所有 Pull Request 上自动检查代码格式。请确保在提交 PR 之前代码格式正确。 + ## 在 nuget.org 上发布你自己的客户端 你可以在这个[文档](./PUBLISH.md)中找到如何发布 \ No newline at end of file diff --git a/src/Apache.IoTDB/PoolHealthMetrics.cs b/src/Apache.IoTDB/PoolHealthMetrics.cs index 938766c..52bcc5f 100644 --- a/src/Apache.IoTDB/PoolHealthMetrics.cs +++ b/src/Apache.IoTDB/PoolHealthMetrics.cs @@ -46,7 +46,7 @@ public void ResetAllCounters() } public int GetReconnectionFailureTally() => Volatile.Read(ref _reconnectionFailureTally); - + public int GetConfiguredMaxSize() => _configuredMaxSize; } } diff --git a/src/Apache.IoTDB/Rpc/TSStatusCode.cs b/src/Apache.IoTDB/Rpc/TSStatusCode.cs index e2eaca8..3f6cee9 100644 --- a/src/Apache.IoTDB/Rpc/TSStatusCode.cs +++ b/src/Apache.IoTDB/Rpc/TSStatusCode.cs @@ -285,4 +285,4 @@ public static string ToString(TSStatusCode statusCode) } } -} \ No newline at end of file +} diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 60c8645..1d2aecf 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -21,9 +21,9 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; using Apache.IoTDB.DataStructure; using Microsoft.Extensions.Logging; using Thrift; @@ -64,19 +64,19 @@ public partial class SessionPool : IDisposable, IPoolDiagnosticReporter private ConcurrentClientQueue _clients; private ILogger _logger; private PoolHealthMetrics _healthMetrics; - + public delegate Task AsyncOperation(Client client); - + /// /// Retrieves current count of idle clients ready for operations. /// public int AvailableClients => _clients?.ClientQueue.Count ?? 0; - + /// /// Retrieves the configured maximum capacity of the session pool. /// public int TotalPoolSize => _healthMetrics?.GetConfiguredMaxSize() ?? _poolSize; - + /// /// Retrieves cumulative tally of reconnection failures since pool was opened. ///