From 1caf41a34a274ede091e7dc8c4621f8f7982b53d Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Tue, 24 Dec 2024 14:44:57 +0100 Subject: [PATCH 01/12] wip: refactor concurrency control service --- .../concurrency-control.service.ts | 66 +++++++++++++++++-- packages/cli/src/config/schema.ts | 6 ++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index ede6cf899748b..1aced615a8669 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -19,10 +19,18 @@ export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; export class ConcurrencyControlService { private isEnabled: boolean; + // private readonly limits: Map; + private readonly productionLimit: number; + private readonly evaluationLimit: number; + + // private readonly queues: Map; + private readonly productionQueue: ConcurrencyQueue; + private readonly evaluationQueue: ConcurrencyQueue; + private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, ); @@ -37,14 +45,24 @@ export class ConcurrencyControlService { this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); + this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit'); + if (this.productionLimit === 0) { throw new InvalidConcurrencyLimitError(this.productionLimit); } + if (this.evaluationLimit === 0) { + throw new InvalidConcurrencyLimitError(this.evaluationLimit); + } + if (this.productionLimit < -1) { this.productionLimit = -1; } + if (this.evaluationLimit < -1) { + this.evaluationLimit = -1; + } + if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { this.isEnabled = false; return; @@ -52,6 +70,8 @@ export class ConcurrencyControlService { this.productionQueue = new ConcurrencyQueue(this.productionLimit); + this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit); + this.logInit(); this.isEnabled = true; @@ -80,7 +100,10 @@ export class ConcurrencyControlService { has(executionId: string) { if (!this.isEnabled) return false; - return this.productionQueue.getAll().has(executionId); + return ( + this.productionQueue.getAll().has(executionId) || + this.evaluationQueue.getAll().has(executionId) + ); } /** @@ -89,7 +112,11 @@ export class ConcurrencyControlService { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - await this.productionQueue.enqueue(executionId); + if (mode === 'evaluation') { + await this.evaluationQueue.enqueue(executionId); + } else { + await this.productionQueue.enqueue(executionId); + } } /** @@ -98,7 +125,11 @@ export class ConcurrencyControlService { release({ mode }: { mode: ExecutionMode }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.dequeue(); + if (mode === 'evaluation') { + this.evaluationQueue.dequeue(); + } else { + this.productionQueue.dequeue(); + } } /** @@ -107,7 +138,11 @@ export class ConcurrencyControlService { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.remove(executionId); + if (mode === 'evaluation') { + this.evaluationQueue.remove(executionId); + } else { + this.productionQueue.remove(executionId); + } } /** @@ -124,6 +159,12 @@ export class ConcurrencyControlService { this.productionQueue.remove(id); } + const enqueuedEvaluationIds = this.evaluationQueue.getAll(); + + for (const id of enqueuedEvaluationIds) { + this.evaluationQueue.remove(id); + } + const executionIds = Object.entries(activeExecutions) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) .map(([executionId, _]) => executionId); @@ -152,6 +193,13 @@ export class ConcurrencyControlService { this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), ].join(' '), ); + + this.logger.debug( + [ + 'Evaluation execution concurrency is', + this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(), + ].join(' '), + ); } private isUnlimited(mode: ExecutionMode) { @@ -168,10 +216,20 @@ export class ConcurrencyControlService { if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + if (mode === 'evaluation') return this.evaluationLimit === -1; + throw new UnknownExecutionModeError(mode); } private shouldReport(capacity: number) { return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); } + + // private getQueue(mode: ExecutionMode) { + // if (['production', 'evaluation'].includes(mode)) { + // return this.queues.get(mode); + // } + // + // throw new UnknownExecutionModeError(mode); + // } } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index e8d28cb782e7c..20d03f07d58b1 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -35,6 +35,12 @@ export const schema = { default: -1, env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', }, + evaluationLimit: { + doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', + format: Number, + default: 1, + env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', + }, }, // A Workflow times out and gets canceled after this time (seconds). From 3d63aa11a556679f3d995556e1d05c15bec50e68 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Fri, 27 Dec 2024 12:14:31 +0100 Subject: [PATCH 02/12] wip: Add logging --- .../cli/src/concurrency/concurrency-control.service.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 1aced615a8669..35f219bd7df7e 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -92,6 +92,15 @@ export class ConcurrencyControlService { this.productionQueue.on('execution-released', async (executionId) => { this.logger.debug('Execution released', { executionId }); }); + + this.evaluationQueue.on('execution-throttled', ({ executionId }) => { + this.logger.debug('Evaluation execution throttled', { executionId }); + this.eventService.emit('execution-throttled', { executionId }); + }); + + this.evaluationQueue.on('execution-released', async (executionId) => { + this.logger.debug('Evaluation execution released', { executionId }); + }); } /** From 65763573767d4f22a930da52f963918a37ddb9e8 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Fri, 27 Dec 2024 14:51:34 +0100 Subject: [PATCH 03/12] wip: Refactor concurrency control service to support multiple types of concurrency limits --- .../concurrency-control.service.test.ts | 371 ++++++++++++++---- .../concurrency-control.service.ts | 199 +++++----- packages/cli/src/config/schema.ts | 2 +- .../cli/src/events/maps/relay.event-map.ts | 2 + 4 files changed, 388 insertions(+), 186 deletions(-) diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index 6511ae4d035a0..1b7df31473156 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -1,6 +1,7 @@ import { mock } from 'jest-mock-extended'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; +import type { ConcurrencyType } from '@/concurrency/concurrency-control.service'; import { CLOUD_TEMP_PRODUCTION_LIMIT, CLOUD_TEMP_REPORTABLE_THRESHOLDS, @@ -24,61 +25,69 @@ describe('ConcurrencyControlService', () => { afterEach(() => { config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); config.set('executions.mode', 'integrated'); jest.clearAllMocks(); }); describe('constructor', () => { - it('should be enabled if production cap is positive', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 1); - - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); - - /** - * Assert - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(true); - // @ts-expect-error Private property - expect(service.productionQueue).toBeDefined(); - }); - - it('should throw if production cap is 0', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 0); + it.each(['production', 'evaluation'])( + 'should be enabled if %s cap is positive', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 1); - try { /** * Act */ - new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); - } catch (error) { + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + /** * Assert */ - expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); - } - }); + // @ts-expect-error Private property + expect(service.isEnabled).toBe(true); + // @ts-expect-error Private property + expect(service.queues.get(type)).toBeDefined(); + }, + ); + + it.each(['production', 'evaluation'])( + 'should throw if %s cap is 0', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 0); + + try { + /** + * Act + */ + new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); + } catch (error) { + /** + * Assert + */ + expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); + } + }, + ); - it('should be disabled if production cap is -1', () => { + it('should be disabled if both production and evaluation caps are -1', () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); /** * Act @@ -97,28 +106,31 @@ describe('ConcurrencyControlService', () => { expect(service.isEnabled).toBe(false); }); - it('should be disabled if production cap is lower than -1', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', -2); + it.each(['production', 'evaluation'])( + 'should be disabled if %s cap is lower than -1', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, -2); - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); + /** + * Act + */ + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); - /** - * Act - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(false); - }); + /** + * Act + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }, + ); it('should be disabled on queue mode', () => { /** @@ -203,6 +215,31 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).toHaveBeenCalled(); }); + + it('should enqueue on evaluation mode', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(enqueueSpy).toHaveBeenCalled(); + }); }); describe('release', () => { @@ -258,6 +295,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).toHaveBeenCalled(); }); + + it('should dequeue on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -316,43 +378,125 @@ describe('ConcurrencyControlService', () => { expect(removeSpy).toHaveBeenCalled(); }, ); + + it('should remove an execution on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenCalled(); + }); }); describe('removeAll', () => { - it('should remove all executions from the production queue', async () => { + it.each(['production', 'evaluation'])( + 'should remove all executions from the %s queue', + async (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 2); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + + jest + .spyOn(ConcurrencyQueue.prototype, 'getAll') + .mockReturnValueOnce(new Set(['1', '2', '3'])); + + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + await service.removeAll({ + '1': mock(), + '2': mock(), + '3': mock(), + }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); + expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); + expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + }, + ); + }); + + describe('get queue', () => { + it('should choose the production queue', async () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); + /** + * Act + */ const service = new ConcurrencyControlService( logger, executionRepository, telemetry, eventService, ); + // @ts-expect-error Private property + const queue = service.getQueue('webhook'); - jest - .spyOn(ConcurrencyQueue.prototype, 'getAll') - .mockReturnValueOnce(new Set(['1', '2', '3'])); + /** + * Assert + */ + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('production')); + }); - const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + it('should choose the evaluation queue', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); /** * Act */ - await service.removeAll({ - '1': mock(), - '2': mock(), - '3': mock(), - }); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + // @ts-expect-error Private property + const queue = service.getQueue('evaluation'); /** * Assert */ - expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); - expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); - expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('evaluation')); }); }); }); @@ -388,6 +532,32 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + await service.throttle({ mode: 'evaluation', executionId: '2' }); + + /** + * Assert + */ + expect(enqueueSpy).not.toHaveBeenCalled(); + }); }); describe('release', () => { @@ -415,6 +585,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).not.toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -442,6 +637,31 @@ describe('ConcurrencyControlService', () => { */ expect(removeSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).not.toHaveBeenCalled(); + }); }); }); @@ -470,14 +690,17 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); /** * Assert */ - expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold }); + expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { + threshold, + concurrencyType: 'production', + }); }, ); @@ -500,7 +723,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); @@ -532,7 +755,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 35f219bd7df7e..18991b40e96e7 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -1,3 +1,4 @@ +import { capitalize } from 'lodash'; import { Logger } from 'n8n-core'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -15,21 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue'; export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; +export type ConcurrencyType = 'production' | 'evaluation'; + @Service() export class ConcurrencyControlService { private isEnabled: boolean; - // private readonly limits: Map; - - private readonly productionLimit: number; - - private readonly evaluationLimit: number; - - // private readonly queues: Map; + private readonly limits: Map; - private readonly productionQueue: ConcurrencyQueue; - - private readonly evaluationQueue: ConcurrencyQueue; + private readonly queues: Map; private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, @@ -43,76 +38,66 @@ export class ConcurrencyControlService { ) { this.logger = this.logger.scoped('concurrency'); - this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); - - this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit'); + this.limits = new Map([ + ['production', config.getEnv('executions.concurrency.productionLimit')], + ['evaluation', config.getEnv('executions.concurrency.evaluationLimit')], + ]); - if (this.productionLimit === 0) { - throw new InvalidConcurrencyLimitError(this.productionLimit); - } - - if (this.evaluationLimit === 0) { - throw new InvalidConcurrencyLimitError(this.evaluationLimit); - } - - if (this.productionLimit < -1) { - this.productionLimit = -1; - } + this.limits.forEach((limit, type) => { + if (limit === 0) { + throw new InvalidConcurrencyLimitError(limit); + } - if (this.evaluationLimit < -1) { - this.evaluationLimit = -1; - } + if (limit < -1) { + this.limits.set(type, -1); + } + }); - if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { + if ( + Array.from(this.limits.values()).every((limit) => limit === -1) || + config.getEnv('executions.mode') === 'queue' + ) { this.isEnabled = false; return; } - this.productionQueue = new ConcurrencyQueue(this.productionLimit); - - this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit); + this.queues = new Map(); + this.limits.forEach((limit, type) => { + this.queues.set(type, new ConcurrencyQueue(limit)); + }); this.logInit(); this.isEnabled = true; - this.productionQueue.on('concurrency-check', ({ capacity }) => { - if (this.shouldReport(capacity)) { - this.telemetry.track('User hit concurrency limit', { - threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, - }); - } - }); - - this.productionQueue.on('execution-throttled', ({ executionId }) => { - this.logger.debug('Execution throttled', { executionId }); - this.eventService.emit('execution-throttled', { executionId }); - }); - - this.productionQueue.on('execution-released', async (executionId) => { - this.logger.debug('Execution released', { executionId }); - }); - - this.evaluationQueue.on('execution-throttled', ({ executionId }) => { - this.logger.debug('Evaluation execution throttled', { executionId }); - this.eventService.emit('execution-throttled', { executionId }); - }); - - this.evaluationQueue.on('execution-released', async (executionId) => { - this.logger.debug('Evaluation execution released', { executionId }); + this.queues.forEach((queue, type) => { + queue.on('concurrency-check', ({ capacity }) => { + if (this.shouldReport(capacity)) { + this.telemetry.track('User hit concurrency limit', { + threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, + concurrencyType: type, + }); + } + }); + + queue.on('execution-throttled', ({ executionId }) => { + this.logger.debug('Execution throttled', { executionId, type }); + this.eventService.emit('execution-throttled', { executionId, type }); + }); + + queue.on('execution-released', async (executionId) => { + this.logger.debug('Execution released', { executionId, type }); + }); }); } /** - * Check whether an execution is in the production queue. + * Check whether an execution is in any of the queues. */ has(executionId: string) { if (!this.isEnabled) return false; - return ( - this.productionQueue.getAll().has(executionId) || - this.evaluationQueue.getAll().has(executionId) - ); + return Array.from(this.queues.values()).some((queue) => queue.getAll().has(executionId)); } /** @@ -121,23 +106,23 @@ export class ConcurrencyControlService { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - await this.evaluationQueue.enqueue(executionId); - } else { - await this.productionQueue.enqueue(executionId); + const queue = this.getQueue(mode); + + if (queue) { + await queue.enqueue(executionId); } } /** - * Release capacity back so the next execution in the production queue can proceed. + * Release capacity back so the next execution in the queue can proceed. */ release({ mode }: { mode: ExecutionMode }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - this.evaluationQueue.dequeue(); - } else { - this.productionQueue.dequeue(); + const queue = this.getQueue(mode); + + if (queue) { + queue.dequeue(); } } @@ -147,10 +132,10 @@ export class ConcurrencyControlService { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - this.evaluationQueue.remove(executionId); - } else { - this.productionQueue.remove(executionId); + const queue = this.getQueue(mode); + + if (queue) { + queue.remove(executionId); } } @@ -162,17 +147,13 @@ export class ConcurrencyControlService { async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { if (!this.isEnabled) return; - const enqueuedProductionIds = this.productionQueue.getAll(); - - for (const id of enqueuedProductionIds) { - this.productionQueue.remove(id); - } + this.queues.forEach((queue) => { + const enqueuedExecutionIds = queue.getAll(); - const enqueuedEvaluationIds = this.evaluationQueue.getAll(); - - for (const id of enqueuedEvaluationIds) { - this.evaluationQueue.remove(id); - } + for (const id of enqueuedExecutionIds) { + queue.remove(id); + } + }); const executionIds = Object.entries(activeExecutions) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) @@ -196,22 +177,30 @@ export class ConcurrencyControlService { private logInit() { this.logger.debug('Enabled'); - this.logger.debug( - [ - 'Production execution concurrency is', - this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), - ].join(' '), - ); - - this.logger.debug( - [ - 'Evaluation execution concurrency is', - this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(), - ].join(' '), - ); + this.limits.forEach((limit, type) => { + this.logger.debug( + [ + `${capitalize(type)} execution concurrency is`, + limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(), + ].join(' '), + ); + }); } private isUnlimited(mode: ExecutionMode) { + const queue = this.getQueue(mode); + + return queue === undefined; + } + + private shouldReport(capacity: number) { + return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); + } + + /** + * Get the concurrency queue based on the execution mode. + */ + private getQueue(mode: ExecutionMode) { if ( mode === 'error' || mode === 'integrated' || @@ -220,25 +209,13 @@ export class ConcurrencyControlService { mode === 'manual' || mode === 'retry' ) { - return true; + return undefined; } - if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production'); - if (mode === 'evaluation') return this.evaluationLimit === -1; + if (mode === 'evaluation') return this.queues.get('evaluation'); throw new UnknownExecutionModeError(mode); } - - private shouldReport(capacity: number) { - return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); - } - - // private getQueue(mode: ExecutionMode) { - // if (['production', 'evaluation'].includes(mode)) { - // return this.queues.get(mode); - // } - // - // throw new UnknownExecutionModeError(mode); - // } } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 20d03f07d58b1..cf75da716f557 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -38,7 +38,7 @@ export const schema = { evaluationLimit: { doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', format: Number, - default: 1, + default: -1, env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', }, }, diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index 0e725645715e2..4794bdedbf553 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -12,6 +12,7 @@ import type { GlobalRole, User } from '@/databases/entities/user'; import type { IWorkflowDb } from '@/interfaces'; import type { AiEventMap } from './ai.event-map'; +import { ConcurrencyType } from '@/concurrency/concurrency-control.service'; export type UserLike = { id: string; @@ -338,6 +339,7 @@ export type RelayEventMap = { 'execution-throttled': { executionId: string; + type: ConcurrencyType; }; 'execution-started-during-bootup': { From 795b2961e7b77c0807611ce0540ccf7ac157b460 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Fri, 27 Dec 2024 16:10:41 +0100 Subject: [PATCH 04/12] wip: Exclude evaluation executions from counting towards limit in UI --- .../components/executions/global/GlobalExecutionsList.vue | 5 ++++- .../executions/workflow/WorkflowExecutionsSidebar.vue | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/editor-ui/src/components/executions/global/GlobalExecutionsList.vue b/packages/editor-ui/src/components/executions/global/GlobalExecutionsList.vue index 01b72dde508b6..5795d22db6401 100644 --- a/packages/editor-ui/src/components/executions/global/GlobalExecutionsList.vue +++ b/packages/editor-ui/src/components/executions/global/GlobalExecutionsList.vue @@ -74,7 +74,10 @@ const isAnnotationEnabled = computed( ); const runningExecutionsCount = computed(() => { - return props.executions.filter((execution) => execution.status === 'running').length; + return props.executions.filter( + (execution) => + execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode), + ).length; }); watch( diff --git a/packages/editor-ui/src/components/executions/workflow/WorkflowExecutionsSidebar.vue b/packages/editor-ui/src/components/executions/workflow/WorkflowExecutionsSidebar.vue index b43f67708d1f4..cf2544f98d692 100644 --- a/packages/editor-ui/src/components/executions/workflow/WorkflowExecutionsSidebar.vue +++ b/packages/editor-ui/src/components/executions/workflow/WorkflowExecutionsSidebar.vue @@ -55,7 +55,10 @@ const executionListRef = ref(null); const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow); const runningExecutionsCount = computed(() => { - return props.executions.filter((execution) => execution.status === 'running').length; + return props.executions.filter( + (execution) => + execution.status === 'running' && ['webhook', 'trigger'].includes(execution.mode), + ).length; }); watch( From fc9be62eb86931dc95e5cd3cc520858825051c73 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Fri, 27 Dec 2024 16:23:26 +0100 Subject: [PATCH 05/12] wip: fix concurrency queues initialization --- .../concurrency/__tests__/concurrency-control.service.test.ts | 2 ++ packages/cli/src/concurrency/concurrency-control.service.ts | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index 1b7df31473156..769d8a0132a09 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -57,6 +57,8 @@ describe('ConcurrencyControlService', () => { expect(service.isEnabled).toBe(true); // @ts-expect-error Private property expect(service.queues.get(type)).toBeDefined(); + // @ts-expect-error Private property + expect(service.queues.size).toBe(1); }, ); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 18991b40e96e7..2d58c9289ba0a 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -63,7 +63,9 @@ export class ConcurrencyControlService { this.queues = new Map(); this.limits.forEach((limit, type) => { - this.queues.set(type, new ConcurrencyQueue(limit)); + if (limit > 0) { + this.queues.set(type, new ConcurrencyQueue(limit)); + } }); this.logInit(); From 99e9d282ee55c30d781840d5db0ce65d206911d0 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Mon, 6 Jan 2025 14:44:55 +0100 Subject: [PATCH 06/12] wip: minor change --- packages/cli/src/config/schema.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index cf75da716f557..ad5075797641f 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -36,7 +36,7 @@ export const schema = { env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', }, evaluationLimit: { - doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', + doc: 'Max evaluation executions allowed to run concurrently.', format: Number, default: -1, env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', From ec4318a88a9563d340ef46c25612e0002b15d13d Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Mon, 6 Jan 2025 17:21:23 +0100 Subject: [PATCH 07/12] wip: fix log streaming type mismatch --- .../src/events/__tests__/log-streaming-event-relay.test.ts | 2 ++ packages/cli/src/events/relays/log-streaming.event-relay.ts | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index 4727c8ef722fa..17295092106aa 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -934,6 +934,7 @@ describe('LogStreamingEventRelay', () => { it('should log on `execution-throttled` event', () => { const event: RelayEventMap['execution-throttled'] = { executionId: 'exec123456', + type: 'production', }; eventService.emit('execution-throttled', event); @@ -942,6 +943,7 @@ describe('LogStreamingEventRelay', () => { eventName: 'n8n.execution.throttled', payload: { executionId: 'exec123456', + type: 'production', }, }); }); diff --git a/packages/cli/src/events/relays/log-streaming.event-relay.ts b/packages/cli/src/events/relays/log-streaming.event-relay.ts index b048b09a8303d..76b578451dd5e 100644 --- a/packages/cli/src/events/relays/log-streaming.event-relay.ts +++ b/packages/cli/src/events/relays/log-streaming.event-relay.ts @@ -385,10 +385,10 @@ export class LogStreamingEventRelay extends EventRelay { // #region Execution - private executionThrottled({ executionId }: RelayEventMap['execution-throttled']) { + private executionThrottled({ executionId, type }: RelayEventMap['execution-throttled']) { void this.eventBus.sendExecutionEvent({ eventName: 'n8n.execution.throttled', - payload: { executionId }, + payload: { executionId, type }, }); } From 1fe16646eb2dfd60b4a6435d59cef01fe491b59f Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Tue, 7 Jan 2025 15:21:47 +0100 Subject: [PATCH 08/12] wip: fix formatting --- packages/cli/src/concurrency/concurrency-control.service.ts | 2 +- packages/cli/src/events/maps/relay.event-map.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 35ac95f34be51..f5ba27e6b0f32 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -1,5 +1,5 @@ -import { capitalize } from 'lodash'; import { Service } from '@n8n/di'; +import { capitalize } from 'lodash'; import { Logger } from 'n8n-core'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index 4794bdedbf553..3d70ef2959d5f 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -6,13 +6,13 @@ import type { IWorkflowExecutionDataProcess, } from 'n8n-workflow'; +import type { ConcurrencyType } from '@/concurrency/concurrency-control.service'; import type { AuthProviderType } from '@/databases/entities/auth-identity'; import type { ProjectRole } from '@/databases/entities/project-relation'; import type { GlobalRole, User } from '@/databases/entities/user'; import type { IWorkflowDb } from '@/interfaces'; import type { AiEventMap } from './ai.event-map'; -import { ConcurrencyType } from '@/concurrency/concurrency-control.service'; export type UserLike = { id: string; From ac54d349e9a1bdb07256e57cbf37adb5ccbe4b93 Mon Sep 17 00:00:00 2001 From: Eugene Molodkin Date: Wed, 8 Jan 2025 13:10:44 +0100 Subject: [PATCH 09/12] wip: added a note about evaluation executions concurrency limit --- .../executions/ConcurrentExecutionsHeader.vue | 22 ++++++++++++++----- .../src/plugins/i18n/locales/en.json | 1 + 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue b/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue index 0cf3c220ffd35..6c4b6915078c6 100644 --- a/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue +++ b/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue @@ -1,6 +1,8 @@