diff --git a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java index 377b2134d780..28c13dd93c64 100644 --- a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java +++ b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,13 +38,11 @@ import javax.mail.MessagingException; import javax.naming.ConfigurationException; -import com.cloud.dc.DataCenter; -import com.cloud.dc.Pod; -import com.cloud.org.Cluster; import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; +import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; @@ -52,8 +51,9 @@ import org.apache.cloudstack.utils.mailing.SMTPMailSender; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; import com.cloud.alert.dao.AlertDao; import com.cloud.api.ApiDBUtils; @@ -66,9 +66,11 @@ import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationManager; import com.cloud.dc.ClusterVO; +import com.cloud.dc.DataCenter; import com.cloud.dc.DataCenter.NetworkType; import com.cloud.dc.DataCenterVO; import com.cloud.dc.HostPodVO; +import com.cloud.dc.Pod; import com.cloud.dc.Vlan.VlanType; import com.cloud.dc.dao.ClusterDao; import com.cloud.dc.dao.DataCenterDao; @@ -82,6 +84,7 @@ import com.cloud.host.dao.HostDao; import com.cloud.network.Ipv6Service; import com.cloud.network.dao.IPAddressDao; +import com.cloud.org.Cluster; import com.cloud.org.Grouping.AllocationState; import com.cloud.resource.ResourceManager; import com.cloud.storage.StorageManager; @@ -93,8 +96,6 @@ import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionStatus; -import org.jetbrains.annotations.Nullable; - public class AlertManagerImpl extends ManagerBase implements AlertManager, Configurable { protected Logger logger = LogManager.getLogger(AlertManagerImpl.class.getName()); @@ -289,30 +290,47 @@ protected void recalculateHostCapacities() { if (hostIds.isEmpty()) { return; } - ConcurrentHashMap> futures = new ConcurrentHashMap<>(); ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size()))); - for (Long hostId : hostIds) { - futures.put(hostId, executorService.submit(() -> { - Transaction.execute(new TransactionCallbackNoReturn() { + final CountDownLatch latch = new CountDownLatch(hostIds.size()); + final ConcurrentHashMap failures = new ConcurrentHashMap<>(); + + try { + for (final Long hostId : hostIds) { + executorService.execute(new ManagedContextRunnable() { @Override - public void doInTransactionWithoutResult(TransactionStatus status) { - final HostVO host = hostDao.findById(hostId); - _capacityMgr.updateCapacityForHost(host); + protected void runInContext() { + try { + final HostVO host = hostDao.findById(hostId); + if (host == null) { + logger.error("Host with ID: {} no longer exists, skipping capacity calculation", hostId); + return; + } + _capacityMgr.updateCapacityForHost(host); + } catch (Throwable t) { + failures.put(hostId, t); + logger.error("Error during host capacity calculation for ID: {}", hostId, t); + } finally { + latch.countDown(); + } } }); - return null; - })); - } - for (Map.Entry> entry: futures.entrySet()) { + } + try { - entry.getValue().get(); - } catch (InterruptedException | ExecutionException e) { - logger.error(String.format("Error during capacity calculation for host: %d due to : %s", - entry.getKey(), e.getMessage()), e); + latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while waiting for host capacity calculation tasks"); } + + if (!failures.isEmpty()) { + logger.warn("Host capacity calculation finished with {} failures out of {} hosts", + failures.size(), hostIds.size()); + } + } finally { + executorService.shutdown(); } - executorService.shutdown(); } protected void recalculateStorageCapacities() { diff --git a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java index 8eebc04ee683..f3e3974c90b1 100644 --- a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java +++ b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java @@ -823,11 +823,17 @@ public void updateCapacityForHost(final Host host) { cpuCoreCap.setUsedCapacity(usedCpuCore); } } - try { - _capacityDao.update(cpuCoreCap.getId(), cpuCoreCap); - } catch (Exception e) { - logger.error("Caught exception while updating cpucore capacity for the host {}", host, e); - } + final CapacityVO cpuCoreCapFinal = cpuCoreCap; + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + try { + _capacityDao.update(cpuCoreCapFinal.getId(), cpuCoreCapFinal); + } catch (Exception e) { + logger.error("Caught exception while updating cpucore capacity for the host {}", host, e); + } + } + }); } else { final long usedCpuCoreFinal = usedCpuCore; final long reservedCpuCoreFinal = reservedCpuCore; @@ -903,12 +909,19 @@ public void doInTransactionWithoutResult(TransactionStatus status) { } } - try { - _capacityDao.update(cpuCap.getId(), cpuCap); - _capacityDao.update(memCap.getId(), memCap); - } catch (Exception e) { - logger.error("Caught exception while updating cpu/memory capacity for the host {}", host, e); - } + final CapacityVO cpuCapFinal = cpuCap; + final CapacityVO memCapFinal = memCap; + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + try { + _capacityDao.update(cpuCapFinal.getId(), cpuCapFinal); + _capacityDao.update(memCapFinal.getId(), memCapFinal); + } catch (Exception e) { + logger.error("Caught exception while updating cpu/memory capacity for the host {}", host, e); + } + } + }); } else { final long usedMemoryFinal = usedMemory; final long reservedMemoryFinal = reservedMemory;