From 905bf37d38f5ac8e1e4c675895da06ec43cedc7b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Jan 2025 17:12:29 +0800 Subject: [PATCH 1/2] Log --- ...PipeRealtimeDataRegionHybridExtractor.java | 123 +++++++++++++++--- .../db/pipe/metric/PipeResourceMetrics.java | 2 +- .../tsfile/PipeTsFileResourceManager.java | 2 +- .../iotdb/commons/conf/CommonConfig.java | 9 ++ .../iotdb/commons/conf/CommonDescriptor.java | 5 + .../iotdb/commons/pipe/config/PipeConfig.java | 5 + 6 files changed, 127 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 0c79b740f3e1..aeff2a23db5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -41,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.function.Consumer; public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor { @@ -222,42 +222,131 @@ private boolean canNotUseTabletAnyMore() { } private boolean isPipeTaskCurrentlyRestarted() { - return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName); + if (PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName)) { + logByLogManager( + l -> l.info("{} can not use tablet anymore because it's currently restarted.", pipeName)); + return true; + } + return false; } private boolean mayWalSizeReachThrottleThreshold() { - return 3 * WALManager.getInstance().getTotalDiskUsage() - > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); + final long walDiskUsage = WALManager.getInstance().getTotalDiskUsage(); + final long walThrottleThreshold = WALManager.getInstance().getThrottleThreshold(); + if (3 * walDiskUsage > walThrottleThreshold) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because the total WAL disk usage {} is larger than the 1/3 of the WAL throttle threshold {}.", + pipeName, + walDiskUsage, + walThrottleThreshold)); + return true; + } + return false; } private boolean mayMemTablePinnedCountReachDangerousThreshold() { - return PipeDataNodeResourceManager.wal().getPinnedWalCount() - >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + final int pinnedWalCount = PipeDataNodeResourceManager.wal().getPinnedWalCount(); + final int pipeMaxAllowedPinnedMemTableCount = + PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + if (pinnedWalCount >= pipeMaxAllowedPinnedMemTableCount) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because the pinned wal count {} is larger than the pipe max allowed pinned wal count {}.", + pipeName, + pinnedWalCount, + pipeMaxAllowedPinnedMemTableCount)); + return true; + } + return false; } private boolean isHistoricalTsFileEventCountExceededLimit() { final IoTDBDataRegionExtractor extractor = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID()); - return Objects.nonNull(extractor) - && extractor.getHistoricalTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion(); + if (Objects.isNull(extractor)) { + return false; + } + final int historicalTsFileInsertionEventCount = + extractor.getHistoricalTsFileInsertionEventCount(); + final int pipeMaxAllowedHistoricalTsFilePerDataRegion = + PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion(); + if (historicalTsFileInsertionEventCount >= pipeMaxAllowedHistoricalTsFilePerDataRegion) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because the historical tsFile event count {} is larger than the pipe max allowed historical tsFile count per data region {}.", + pipeName, + historicalTsFileInsertionEventCount, + pipeMaxAllowedHistoricalTsFilePerDataRegion)); + return true; + } + return false; } private boolean isRealtimeTsFileEventCountExceededLimit() { - return pendingQueue.getTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); + final int tsFileInsertionEventCount = pendingQueue.getTsFileInsertionEventCount(); + final int pipeMaxAllowedPendingTsFileEpochPerDataRegion = + PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); + if (tsFileInsertionEventCount >= pipeMaxAllowedPendingTsFileEpochPerDataRegion) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because the realtime tsFile event count {} is larger than the pipe max allowed pending tsfile epoch count {}.", + pipeName, + tsFileInsertionEventCount, + pipeMaxAllowedPendingTsFileEpochPerDataRegion)); + return true; + } + return false; } private boolean mayTsFileLinkedCountReachDangerousThreshold() { - return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount() - >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + final int linkedTsFileCount = PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(); + final long pipeMaxAllowedLinkedTsFileCount = + PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + if (linkedTsFileCount >= pipeMaxAllowedLinkedTsFileCount) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because the linked tsFile count {} is larger than the pipe max allowed linked tsfile count {}.", + pipeName, + linkedTsFileCount, + pipeMaxAllowedLinkedTsFileCount)); + return true; + } + return false; } private boolean mayInsertNodeMemoryReachDangerousThreshold() { - return 3 - * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName) - * PipeDataNodeAgent.task().getPipeCount() - >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); + final long floatingMemoryUsage = + PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName); + final int pipeCount = PipeDataNodeAgent.task().getPipeCount(); + final long freeMemoryInBytes = PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); + if (3 * floatingMemoryUsage * pipeCount >= 2 * freeMemoryInBytes) { + logByLogManager( + l -> + l.info( + "{} can not use tablet anymore because pipeCount({}) * floatingMemoryUsage of this pipe ({}) is larger than 2/3 if the pipe max allowed free memory in bytes {}.", + pipeName, + pipeCount, + floatingMemoryUsage, + freeMemoryInBytes)); + return true; + } + return false; + } + + private void logByLogManager(final Consumer infoFunction) { + PipeDataNodeResourceManager.log() + .schedule( + PipeRealtimeDataRegionHybridExtractor.class, + Integer.MAX_VALUE, + PipeConfig.getInstance().getPipeDegradeMaxLogIntervalRounds(), + 1) + .ifPresent(infoFunction); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java index 854ff9ffb106..370630834428 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java @@ -66,7 +66,7 @@ public void bindTo(final AbstractMetricService metricService) { Metric.PIPE_LINKED_TSFILE_COUNT.toString(), MetricLevel.IMPORTANT, PipeDataNodeResourceManager.tsfile(), - PipeTsFileResourceManager::getLinkedTsfileCount); + PipeTsFileResourceManager::getLinkedTsFileCount); // phantom reference count metricService.createAutoGauge( Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 9607c41a94b7..9c11ea769406 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -328,7 +328,7 @@ public void unpinTsFileResource(final TsFileResource resource) throws IOExceptio } } - public int getLinkedTsfileCount() { + public int getLinkedTsFileCount() { return hardlinkOrCopiedFileToPipeTsFileResourceMap.size(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 2ce7124917e2..37f8678a4118 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -262,6 +262,7 @@ public class CommonConfig { private int pipeTsFilePinMaxLogIntervalRounds = 90; private int pipeWalPinMaxLogNumPerRound = 10; private int pipeWalPinMaxLogIntervalRounds = 90; + private int pipeDegradeMaxLogIntervalRounds = 100; private boolean pipeMemoryManagementEnabled = true; private long pipeMemoryAllocateRetryIntervalMs = 1000; @@ -1090,6 +1091,14 @@ public void setPipeWalPinMaxLogIntervalRounds(int pipeWalPinMaxLogIntervalRounds this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds; } + public int getPipeDegradeMaxLogIntervalRounds() { + return pipeDegradeMaxLogIntervalRounds; + } + + public void setPipeDegradeMaxLogIntervalRounds(int pipeDegradeMaxLogIntervalRounds) { + this.pipeDegradeMaxLogIntervalRounds = pipeDegradeMaxLogIntervalRounds; + } + public boolean getPipeMemoryManagementEnabled() { return pipeMemoryManagementEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 1026ed6aa0ac..c3683b277987 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -540,6 +540,11 @@ private void loadPipeProps(TrimProperties properties) { properties.getProperty( "pipe_wal_pin_max_log_interval_rounds", String.valueOf(config.getPipeWalPinMaxLogIntervalRounds())))); + config.setPipeDegradeMaxLogIntervalRounds( + Integer.parseInt( + properties.getProperty( + "pipe_degrade_max_log_interval_rounds", + String.valueOf(config.getPipeDegradeMaxLogIntervalRounds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 4032bcc0af28..f3d519fc4dcd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -280,6 +280,10 @@ public int getPipeWalPinMaxLogIntervalRounds() { return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds(); } + public int getPipeDegradeMaxLogIntervalRounds() { + return COMMON_CONFIG.getPipeDegradeMaxLogIntervalRounds(); + } + /////////////////////////////// Memory /////////////////////////////// public boolean getPipeMemoryManagementEnabled() { @@ -454,6 +458,7 @@ public void printAllConfigs() { LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds()); LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", getPipeWalPinMaxLogNumPerRound()); LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", getPipeWalPinMaxLogIntervalRounds()); + LOGGER.info("PipeDegradeMaxLogIntervalRounds: {}", getPipeDegradeMaxLogIntervalRounds()); LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries()); From 9741fd91d0ff6426d82deac9fec4bfa34f600fb6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Jan 2025 17:15:26 +0800 Subject: [PATCH 2/2] optimization --- .../PipeRealtimeDataRegionHybridExtractor.java | 2 +- .../org/apache/iotdb/commons/conf/CommonConfig.java | 10 +++++----- .../apache/iotdb/commons/conf/CommonDescriptor.java | 6 +++--- .../apache/iotdb/commons/pipe/config/PipeConfig.java | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index aeff2a23db5d..02bc4604b5a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -344,7 +344,7 @@ private void logByLogManager(final Consumer infoFunction) { .schedule( PipeRealtimeDataRegionHybridExtractor.class, Integer.MAX_VALUE, - PipeConfig.getInstance().getPipeDegradeMaxLogIntervalRounds(), + PipeConfig.getInstance().getPipeDegradeLogIntervalRounds(), 1) .ifPresent(infoFunction); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 37f8678a4118..2d8eac1dba03 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -262,7 +262,7 @@ public class CommonConfig { private int pipeTsFilePinMaxLogIntervalRounds = 90; private int pipeWalPinMaxLogNumPerRound = 10; private int pipeWalPinMaxLogIntervalRounds = 90; - private int pipeDegradeMaxLogIntervalRounds = 100; + private int pipeDegradeLogIntervalRounds = 100; private boolean pipeMemoryManagementEnabled = true; private long pipeMemoryAllocateRetryIntervalMs = 1000; @@ -1091,12 +1091,12 @@ public void setPipeWalPinMaxLogIntervalRounds(int pipeWalPinMaxLogIntervalRounds this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds; } - public int getPipeDegradeMaxLogIntervalRounds() { - return pipeDegradeMaxLogIntervalRounds; + public int getPipeDegradeLogIntervalRounds() { + return pipeDegradeLogIntervalRounds; } - public void setPipeDegradeMaxLogIntervalRounds(int pipeDegradeMaxLogIntervalRounds) { - this.pipeDegradeMaxLogIntervalRounds = pipeDegradeMaxLogIntervalRounds; + public void setPipeDegradeLogIntervalRounds(int pipeDegradeLogIntervalRounds) { + this.pipeDegradeLogIntervalRounds = pipeDegradeLogIntervalRounds; } public boolean getPipeMemoryManagementEnabled() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index c3683b277987..a1c1b3055f3f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -540,11 +540,11 @@ private void loadPipeProps(TrimProperties properties) { properties.getProperty( "pipe_wal_pin_max_log_interval_rounds", String.valueOf(config.getPipeWalPinMaxLogIntervalRounds())))); - config.setPipeDegradeMaxLogIntervalRounds( + config.setPipeDegradeLogIntervalRounds( Integer.parseInt( properties.getProperty( - "pipe_degrade_max_log_interval_rounds", - String.valueOf(config.getPipeDegradeMaxLogIntervalRounds())))); + "pipe_degrade_log_interval_rounds", + String.valueOf(config.getPipeDegradeLogIntervalRounds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index f3d519fc4dcd..273af2e86a03 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -280,8 +280,8 @@ public int getPipeWalPinMaxLogIntervalRounds() { return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds(); } - public int getPipeDegradeMaxLogIntervalRounds() { - return COMMON_CONFIG.getPipeDegradeMaxLogIntervalRounds(); + public int getPipeDegradeLogIntervalRounds() { + return COMMON_CONFIG.getPipeDegradeLogIntervalRounds(); } /////////////////////////////// Memory /////////////////////////////// @@ -458,7 +458,7 @@ public void printAllConfigs() { LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds()); LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", getPipeWalPinMaxLogNumPerRound()); LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", getPipeWalPinMaxLogIntervalRounds()); - LOGGER.info("PipeDegradeMaxLogIntervalRounds: {}", getPipeDegradeMaxLogIntervalRounds()); + LOGGER.info("PipeDegradeMaxLogIntervalRounds: {}", getPipeDegradeLogIntervalRounds()); LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries());