Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Separate concurrency limits for production and evaluation executions (no-changelog) #12387

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

134 changes: 86 additions & 48 deletions packages/cli/src/concurrency/concurrency-control.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Service } from '@n8n/di';
import { capitalize } from 'lodash';
import { Logger } from 'n8n-core';
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';

Expand All @@ -15,13 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue';
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];

export type ConcurrencyQueueType = 'production' | 'evaluation';

@Service()
export class ConcurrencyControlService {
private isEnabled: boolean;

private readonly productionLimit: number;
private readonly limits: Map<ConcurrencyQueueType, number>;

private readonly productionQueue: ConcurrencyQueue;
private readonly queues: Map<ConcurrencyQueueType, ConcurrencyQueue>;

private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
Expand All @@ -35,52 +38,74 @@ export class ConcurrencyControlService {
) {
this.logger = this.logger.scoped('concurrency');

this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
this.limits = new Map([
['production', config.getEnv('executions.concurrency.productionLimit')],
['evaluation', config.getEnv('executions.concurrency.evaluationLimit')],
]);

if (this.productionLimit === 0) {
throw new InvalidConcurrencyLimitError(this.productionLimit);
}
this.limits.forEach((limit, type) => {
if (limit === 0) {
throw new InvalidConcurrencyLimitError(limit);
}

if (this.productionLimit < -1) {
this.productionLimit = -1;
}
if (limit < -1) {
this.limits.set(type, -1);
}
});

if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
if (
Array.from(this.limits.values()).every((limit) => limit === -1) ||
config.getEnv('executions.mode') === 'queue'
) {
this.isEnabled = false;
return;
}

this.productionQueue = new ConcurrencyQueue(this.productionLimit);
this.queues = new Map();
this.limits.forEach((limit, type) => {
if (limit > 0) {
this.queues.set(type, new ConcurrencyQueue(limit));
}
});

this.logInit();

this.isEnabled = true;

this.productionQueue.on('concurrency-check', ({ capacity }) => {
if (this.shouldReport(capacity)) {
this.telemetry.track('User hit concurrency limit', {
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
});
}
});

this.productionQueue.on('execution-throttled', ({ executionId }) => {
this.logger.debug('Execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId });
});

this.productionQueue.on('execution-released', async (executionId) => {
this.logger.debug('Execution released', { executionId });
this.queues.forEach((queue, type) => {
queue.on('concurrency-check', ({ capacity }) => {
if (this.shouldReport(capacity)) {
this.telemetry.track('User hit concurrency limit', {
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
concurrencyQueue: type,
});
}
});

queue.on('execution-throttled', ({ executionId }) => {
this.logger.debug('Execution throttled', { executionId, type });
this.eventService.emit('execution-throttled', { executionId, type });
});

queue.on('execution-released', (executionId) => {
this.logger.debug('Execution released', { executionId, type });
});
});
}

/**
* Check whether an execution is in the production queue.
* Check whether an execution is in any of the queues.
*/
has(executionId: string) {
if (!this.isEnabled) return false;

return this.productionQueue.getAll().has(executionId);
for (const queue of this.queues.values()) {
if (queue.has(executionId)) {
return true;
}
}

return false;
}

/**
Expand All @@ -89,16 +114,16 @@ export class ConcurrencyControlService {
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;

await this.productionQueue.enqueue(executionId);
await this.getQueue(mode)?.enqueue(executionId);
}

/**
* Release capacity back so the next execution in the production queue can proceed.
* Release capacity back so the next execution in the queue can proceed.
*/
release({ mode }: { mode: ExecutionMode }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;

this.productionQueue.dequeue();
this.getQueue(mode)?.dequeue();
}

/**
Expand All @@ -107,7 +132,7 @@ export class ConcurrencyControlService {
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
if (!this.isEnabled || this.isUnlimited(mode)) return;

this.productionQueue.remove(executionId);
this.getQueue(mode)?.remove(executionId);
}

/**
Expand All @@ -118,11 +143,13 @@ export class ConcurrencyControlService {
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
if (!this.isEnabled) return;

const enqueuedProductionIds = this.productionQueue.getAll();
this.queues.forEach((queue) => {
const enqueuedExecutionIds = queue.getAll();

for (const id of enqueuedProductionIds) {
this.productionQueue.remove(id);
}
for (const id of enqueuedExecutionIds) {
queue.remove(id);
}
});

const executionIds = Object.entries(activeExecutions)
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
Expand All @@ -146,15 +173,28 @@ export class ConcurrencyControlService {
private logInit() {
this.logger.debug('Enabled');

this.logger.debug(
[
'Production execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
].join(' '),
);
this.limits.forEach((limit, type) => {
this.logger.debug(
[
`${capitalize(type)} execution concurrency is`,
limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(),
].join(' '),
);
});
}

private isUnlimited(mode: ExecutionMode) {
return this.getQueue(mode) === undefined;
}

private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}

/**
* Get the concurrency queue based on the execution mode.
*/
private getQueue(mode: ExecutionMode) {
if (
mode === 'error' ||
mode === 'integrated' ||
Expand All @@ -163,15 +203,13 @@ export class ConcurrencyControlService {
mode === 'manual' ||
mode === 'retry'
) {
return true;
return undefined;
}

if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1;
if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production');

throw new UnknownExecutionModeError(mode);
}
if (mode === 'evaluation') return this.queues.get('evaluation');

private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
throw new UnknownExecutionModeError(mode);
}
}
4 changes: 4 additions & 0 deletions packages/cli/src/concurrency/concurrency-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export class ConcurrencyQueue extends TypedEmitter<ConcurrencyEvents> {
return new Set(this.queue.map((item) => item.executionId));
}

has(executionId: string) {
return this.queue.some((item) => item.executionId === executionId);
}

private resolveNext() {
const item = this.queue.shift();

Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export const schema = {
default: -1,
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
},
evaluationLimit: {
doc: 'Max evaluation executions allowed to run concurrently.',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the other details as well? They all apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about the queue mode here, as the concurrency control service is disabled when n8n is being run in 'queue' mode. Isn't some different way to limit concurrency applies in queue mode for workers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In queue mode, concurrency control uses the exact same env but is delegated to Bull.

format: Number,
default: -1,
env: 'N8N_CONCURRENCY_EVALUATION_LIMIT',
burivuhster marked this conversation as resolved.
Show resolved Hide resolved
},
},

// A Workflow times out and gets canceled after this time (seconds).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ describe('LogStreamingEventRelay', () => {
it('should log on `execution-throttled` event', () => {
const event: RelayEventMap['execution-throttled'] = {
executionId: 'exec123456',
type: 'production',
};

eventService.emit('execution-throttled', event);
Expand All @@ -942,6 +943,7 @@ describe('LogStreamingEventRelay', () => {
eventName: 'n8n.execution.throttled',
payload: {
executionId: 'exec123456',
type: 'production',
},
});
});
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/events/maps/relay.event-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';

import type { ConcurrencyQueueType } from '@/concurrency/concurrency-control.service';
import type { AuthProviderType } from '@/databases/entities/auth-identity';
import type { GlobalRole, User } from '@/databases/entities/user';
import type { IWorkflowDb } from '@/interfaces';
Expand Down Expand Up @@ -337,6 +338,7 @@ export type RelayEventMap = {

'execution-throttled': {
executionId: string;
type: ConcurrencyQueueType;
};

'execution-started-during-bootup': {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/events/relays/log-streaming.event-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ export class LogStreamingEventRelay extends EventRelay {

// #region Execution

private executionThrottled({ executionId }: RelayEventMap['execution-throttled']) {
private executionThrottled({ executionId, type }: RelayEventMap['execution-throttled']) {
void this.eventBus.sendExecutionEvent({
eventName: 'n8n.execution.throttled',
payload: { executionId },
payload: { executionId, type },
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<script lang="ts" setup>
import { computed } from 'vue';
import { useI18n } from '@/composables/useI18n';
import { WORKFLOW_EVALUATION_EXPERIMENT } from '@/constants';
import { usePostHog } from '@/stores/posthog.store';

const props = defineProps<{
runningExecutionsCount: number;
Expand All @@ -14,14 +16,22 @@ const emit = defineEmits<{

const i18n = useI18n();

const tooltipText = computed(() =>
i18n.baseText('executionsList.activeExecutions.tooltip', {
const posthogStore = usePostHog();

const tooltipText = computed(() => {
let text = i18n.baseText('executionsList.activeExecutions.tooltip', {
interpolate: {
running: props.runningExecutionsCount,
cap: props.concurrencyCap,
},
}),
);
});

if (posthogStore.isFeatureEnabled(WORKFLOW_EVALUATION_EXPERIMENT)) {
text += '\n' + i18n.baseText('executionsList.activeExecutions.evaluationNote');
}

return text;
});

const headerText = computed(() => {
if (props.runningExecutionsCount === 0) {
Expand All @@ -38,6 +48,7 @@ const headerText = computed(() => {

<template>
<div data-test-id="concurrent-executions-header">
<n8n-text>{{ headerText }}</n8n-text>
<n8n-tooltip>
<template #content>
<div :class="$style.tooltip">
Expand All @@ -60,9 +71,8 @@ const headerText = computed(() => {
>
</div>
</template>
<font-awesome-icon icon="info-circle" class="mr-2xs" />
<font-awesome-icon icon="info-circle" class="ml-2xs" />
</n8n-tooltip>
<n8n-text>{{ headerText }}</n8n-text>
</div>
</template>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,15 @@ const isAnnotationEnabled = computed(
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
);

/**
* Calculate the number of executions counted towards the production executions concurrency limit.
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
*/
const runningExecutionsCount = computed(() => {
return props.executions.filter((execution) => execution.status === 'running').length;
return props.executions.filter(
(execution) =>
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
ivov marked this conversation as resolved.
Show resolved Hide resolved
).length;
});

watch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ const executionListRef = ref<HTMLElement | null>(null);

const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);

/**
* Calculate the number of executions counted towards the production executions concurrency limit.
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
*/
const runningExecutionsCount = computed(() => {
return props.executions.filter((execution) => execution.status === 'running').length;
return props.executions.filter(
(execution) =>
execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode),
).length;
});

watch(
Expand Down
1 change: 1 addition & 0 deletions packages/editor-ui/src/plugins/i18n/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@
"executionsList.activeExecutions.none": "No active executions",
"executionsList.activeExecutions.header": "{running}/{cap} active executions",
"executionsList.activeExecutions.tooltip": "Current active executions: {running} out of {cap}. This instance is limited to {cap} concurrent production executions.",
"executionsList.activeExecutions.evaluationNote": "Evaluation runs appear in the list of executions but do not count towards your execution concurrency.",
ivov marked this conversation as resolved.
Show resolved Hide resolved
"executionsList.allWorkflows": "All Workflows",
"executionsList.anyStatus": "Any Status",
"executionsList.autoRefresh": "Auto refresh",
Expand Down
Loading