Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Added log for hybrid extractor degrade #14612

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
Expand All @@ -41,6 +40,7 @@
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Consumer;

public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {

Expand Down Expand Up @@ -222,42 +222,131 @@ private boolean canNotUseTabletAnyMore() {
}

private boolean isPipeTaskCurrentlyRestarted() {
return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
if (PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName)) {
logByLogManager(
l -> l.info("{} can not use tablet anymore because it's currently restarted.", pipeName));
return true;
}
return false;
}

private boolean mayWalSizeReachThrottleThreshold() {
return 3 * WALManager.getInstance().getTotalDiskUsage()
> IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
final long walDiskUsage = WALManager.getInstance().getTotalDiskUsage();
final long walThrottleThreshold = WALManager.getInstance().getThrottleThreshold();
if (3 * walDiskUsage > walThrottleThreshold) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because the total WAL disk usage {} is larger than the 1/3 of the WAL throttle threshold {}.",
pipeName,
walDiskUsage,
walThrottleThreshold));
return true;
}
return false;
}

private boolean mayMemTablePinnedCountReachDangerousThreshold() {
return PipeDataNodeResourceManager.wal().getPinnedWalCount()
>= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
final int pinnedWalCount = PipeDataNodeResourceManager.wal().getPinnedWalCount();
final int pipeMaxAllowedPinnedMemTableCount =
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
if (pinnedWalCount >= pipeMaxAllowedPinnedMemTableCount) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because the pinned wal count {} is larger than the pipe max allowed pinned wal count {}.",
pipeName,
pinnedWalCount,
pipeMaxAllowedPinnedMemTableCount));
return true;
}
return false;
}

private boolean isHistoricalTsFileEventCountExceededLimit() {
final IoTDBDataRegionExtractor extractor =
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
return Objects.nonNull(extractor)
&& extractor.getHistoricalTsFileInsertionEventCount()
>= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
if (Objects.isNull(extractor)) {
return false;
}
final int historicalTsFileInsertionEventCount =
extractor.getHistoricalTsFileInsertionEventCount();
final int pipeMaxAllowedHistoricalTsFilePerDataRegion =
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
if (historicalTsFileInsertionEventCount >= pipeMaxAllowedHistoricalTsFilePerDataRegion) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because the historical tsFile event count {} is larger than the pipe max allowed historical tsFile count per data region {}.",
pipeName,
historicalTsFileInsertionEventCount,
pipeMaxAllowedHistoricalTsFilePerDataRegion));
return true;
}
return false;
}

private boolean isRealtimeTsFileEventCountExceededLimit() {
return pendingQueue.getTsFileInsertionEventCount()
>= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
final int tsFileInsertionEventCount = pendingQueue.getTsFileInsertionEventCount();
final int pipeMaxAllowedPendingTsFileEpochPerDataRegion =
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
if (tsFileInsertionEventCount >= pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because the realtime tsFile event count {} is larger than the pipe max allowed pending tsfile epoch count {}.",
pipeName,
tsFileInsertionEventCount,
pipeMaxAllowedPendingTsFileEpochPerDataRegion));
return true;
}
return false;
}

private boolean mayTsFileLinkedCountReachDangerousThreshold() {
return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
final int linkedTsFileCount = PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount();
final long pipeMaxAllowedLinkedTsFileCount =
PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
if (linkedTsFileCount >= pipeMaxAllowedLinkedTsFileCount) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because the linked tsFile count {} is larger than the pipe max allowed linked tsfile count {}.",
pipeName,
linkedTsFileCount,
pipeMaxAllowedLinkedTsFileCount));
return true;
}
return false;
}

private boolean mayInsertNodeMemoryReachDangerousThreshold() {
return 3
* PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
* PipeDataNodeAgent.task().getPipeCount()
>= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
final long floatingMemoryUsage =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final int pipeCount = PipeDataNodeAgent.task().getPipeCount();
final long freeMemoryInBytes = PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
if (3 * floatingMemoryUsage * pipeCount >= 2 * freeMemoryInBytes) {
logByLogManager(
l ->
l.info(
"{} can not use tablet anymore because pipeCount({}) * floatingMemoryUsage of this pipe ({}) is larger than 2/3 if the pipe max allowed free memory in bytes {}.",
pipeName,
pipeCount,
floatingMemoryUsage,
freeMemoryInBytes));
return true;
}
return false;
}

private void logByLogManager(final Consumer<Logger> infoFunction) {
PipeDataNodeResourceManager.log()
.schedule(
PipeRealtimeDataRegionHybridExtractor.class,
Integer.MAX_VALUE,
PipeConfig.getInstance().getPipeDegradeLogIntervalRounds(),
1)
.ifPresent(infoFunction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void bindTo(final AbstractMetricService metricService) {
Metric.PIPE_LINKED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
PipeDataNodeResourceManager.tsfile(),
PipeTsFileResourceManager::getLinkedTsfileCount);
PipeTsFileResourceManager::getLinkedTsFileCount);
// phantom reference count
metricService.createAutoGauge(
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void unpinTsFileResource(final TsFileResource resource) throws IOExceptio
}
}

public int getLinkedTsfileCount() {
public int getLinkedTsFileCount() {
return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public class CommonConfig {
private int pipeTsFilePinMaxLogIntervalRounds = 90;
private int pipeWalPinMaxLogNumPerRound = 10;
private int pipeWalPinMaxLogIntervalRounds = 90;
private int pipeDegradeLogIntervalRounds = 100;

private boolean pipeMemoryManagementEnabled = true;
private long pipeMemoryAllocateRetryIntervalMs = 1000;
Expand Down Expand Up @@ -1090,6 +1091,14 @@ public void setPipeWalPinMaxLogIntervalRounds(int pipeWalPinMaxLogIntervalRounds
this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds;
}

public int getPipeDegradeLogIntervalRounds() {
return pipeDegradeLogIntervalRounds;
}

public void setPipeDegradeLogIntervalRounds(int pipeDegradeLogIntervalRounds) {
this.pipeDegradeLogIntervalRounds = pipeDegradeLogIntervalRounds;
}

public boolean getPipeMemoryManagementEnabled() {
return pipeMemoryManagementEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ private void loadPipeProps(TrimProperties properties) {
properties.getProperty(
"pipe_wal_pin_max_log_interval_rounds",
String.valueOf(config.getPipeWalPinMaxLogIntervalRounds()))));
config.setPipeDegradeLogIntervalRounds(
Integer.parseInt(
properties.getProperty(
"pipe_degrade_log_interval_rounds",
String.valueOf(config.getPipeDegradeLogIntervalRounds()))));

config.setPipeMemoryManagementEnabled(
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ public int getPipeWalPinMaxLogIntervalRounds() {
return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds();
}

public int getPipeDegradeLogIntervalRounds() {
return COMMON_CONFIG.getPipeDegradeLogIntervalRounds();
}

/////////////////////////////// Memory ///////////////////////////////

public boolean getPipeMemoryManagementEnabled() {
Expand Down Expand Up @@ -454,6 +458,7 @@ public void printAllConfigs() {
LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds());
LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", getPipeWalPinMaxLogNumPerRound());
LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", getPipeWalPinMaxLogIntervalRounds());
LOGGER.info("PipeDegradeMaxLogIntervalRounds: {}", getPipeDegradeLogIntervalRounds());

LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled());
LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries());
Expand Down
Loading