diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index 6511ae4d035a0..a20c1769da323 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -1,6 +1,7 @@ import { mock } from 'jest-mock-extended'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; +import type { ConcurrencyQueueType } from '@/concurrency/concurrency-control.service'; import { CLOUD_TEMP_PRODUCTION_LIMIT, CLOUD_TEMP_REPORTABLE_THRESHOLDS, @@ -24,61 +25,71 @@ describe('ConcurrencyControlService', () => { afterEach(() => { config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); config.set('executions.mode', 'integrated'); jest.clearAllMocks(); }); describe('constructor', () => { - it('should be enabled if production cap is positive', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 1); - - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); - - /** - * Assert - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(true); - // @ts-expect-error Private property - expect(service.productionQueue).toBeDefined(); - }); - - it('should throw if production cap is 0', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 0); + it.each(['production', 'evaluation'])( + 'should be enabled if %s cap is positive', + (type: ConcurrencyQueueType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 1); - try { /** * Act */ - new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); - } catch (error) { + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + /** * Assert */ - expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); - } - }); + // @ts-expect-error Private property + expect(service.isEnabled).toBe(true); + // @ts-expect-error Private property + expect(service.queues.get(type)).toBeDefined(); + // @ts-expect-error Private property + expect(service.queues.size).toBe(1); + }, + ); + + it.each(['production', 'evaluation'])( + 'should throw if %s cap is 0', + (type: ConcurrencyQueueType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 0); + + try { + /** + * Act + */ + new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); + } catch (error) { + /** + * Assert + */ + expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); + } + }, + ); - it('should be disabled if production cap is -1', () => { + it('should be disabled if both production and evaluation caps are -1', () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); /** * Act @@ -97,28 +108,31 @@ describe('ConcurrencyControlService', () => { expect(service.isEnabled).toBe(false); }); - it('should be disabled if production cap is lower than -1', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', -2); + it.each(['production', 'evaluation'])( + 'should be disabled if %s cap is lower than -1', + (type: ConcurrencyQueueType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, -2); - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); + /** + * Act + */ + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); - /** - * Act - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(false); - }); + /** + * Act + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }, + ); it('should be disabled on queue mode', () => { /** @@ -203,6 +217,31 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).toHaveBeenCalled(); }); + + it('should enqueue on evaluation mode', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(enqueueSpy).toHaveBeenCalled(); + }); }); describe('release', () => { @@ -258,6 +297,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).toHaveBeenCalled(); }); + + it('should dequeue on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -316,43 +380,125 @@ describe('ConcurrencyControlService', () => { expect(removeSpy).toHaveBeenCalled(); }, ); + + it('should remove an execution on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenCalled(); + }); }); describe('removeAll', () => { - it('should remove all executions from the production queue', async () => { + it.each(['production', 'evaluation'])( + 'should remove all executions from the %s queue', + async (type: ConcurrencyQueueType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 2); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + + jest + .spyOn(ConcurrencyQueue.prototype, 'getAll') + .mockReturnValueOnce(new Set(['1', '2', '3'])); + + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + await service.removeAll({ + '1': mock(), + '2': mock(), + '3': mock(), + }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); + expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); + expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + }, + ); + }); + + describe('get queue', () => { + it('should choose the production queue', async () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); + /** + * Act + */ const service = new ConcurrencyControlService( logger, executionRepository, telemetry, eventService, ); + // @ts-expect-error Private property + const queue = service.getQueue('webhook'); - jest - .spyOn(ConcurrencyQueue.prototype, 'getAll') - .mockReturnValueOnce(new Set(['1', '2', '3'])); + /** + * Assert + */ + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('production')); + }); - const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + it('should choose the evaluation queue', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); /** * Act */ - await service.removeAll({ - '1': mock(), - '2': mock(), - '3': mock(), - }); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + // @ts-expect-error Private property + const queue = service.getQueue('evaluation'); /** * Assert */ - expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); - expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); - expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('evaluation')); }); }); }); @@ -388,6 +534,32 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + await service.throttle({ mode: 'evaluation', executionId: '2' }); + + /** + * Assert + */ + expect(enqueueSpy).not.toHaveBeenCalled(); + }); }); describe('release', () => { @@ -415,6 +587,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).not.toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -442,6 +639,31 @@ describe('ConcurrencyControlService', () => { */ expect(removeSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).not.toHaveBeenCalled(); + }); }); }); @@ -470,14 +692,17 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); /** * Assert */ - expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold }); + expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { + threshold, + concurrencyQueue: 'production', + }); }, ); @@ -500,7 +725,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); @@ -532,7 +757,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 6088d0f4c3bdd..1984148c292c3 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -1,4 +1,5 @@ import { Service } from '@n8n/di'; +import { capitalize } from 'lodash'; import { Logger } from 'n8n-core'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; @@ -15,13 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue'; export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; +export type ConcurrencyQueueType = 'production' | 'evaluation'; + @Service() export class ConcurrencyControlService { private isEnabled: boolean; - private readonly productionLimit: number; + private readonly limits: Map; - private readonly productionQueue: ConcurrencyQueue; + private readonly queues: Map; private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, @@ -35,52 +38,74 @@ export class ConcurrencyControlService { ) { this.logger = this.logger.scoped('concurrency'); - this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); + this.limits = new Map([ + ['production', config.getEnv('executions.concurrency.productionLimit')], + ['evaluation', config.getEnv('executions.concurrency.evaluationLimit')], + ]); - if (this.productionLimit === 0) { - throw new InvalidConcurrencyLimitError(this.productionLimit); - } + this.limits.forEach((limit, type) => { + if (limit === 0) { + throw new InvalidConcurrencyLimitError(limit); + } - if (this.productionLimit < -1) { - this.productionLimit = -1; - } + if (limit < -1) { + this.limits.set(type, -1); + } + }); - if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { + if ( + Array.from(this.limits.values()).every((limit) => limit === -1) || + config.getEnv('executions.mode') === 'queue' + ) { this.isEnabled = false; return; } - this.productionQueue = new ConcurrencyQueue(this.productionLimit); + this.queues = new Map(); + this.limits.forEach((limit, type) => { + if (limit > 0) { + this.queues.set(type, new ConcurrencyQueue(limit)); + } + }); this.logInit(); this.isEnabled = true; - this.productionQueue.on('concurrency-check', ({ capacity }) => { - if (this.shouldReport(capacity)) { - this.telemetry.track('User hit concurrency limit', { - threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, - }); - } - }); - - this.productionQueue.on('execution-throttled', ({ executionId }) => { - this.logger.debug('Execution throttled', { executionId }); - this.eventService.emit('execution-throttled', { executionId }); - }); - - this.productionQueue.on('execution-released', async (executionId) => { - this.logger.debug('Execution released', { executionId }); + this.queues.forEach((queue, type) => { + queue.on('concurrency-check', ({ capacity }) => { + if (this.shouldReport(capacity)) { + this.telemetry.track('User hit concurrency limit', { + threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, + concurrencyQueue: type, + }); + } + }); + + queue.on('execution-throttled', ({ executionId }) => { + this.logger.debug('Execution throttled', { executionId, type }); + this.eventService.emit('execution-throttled', { executionId, type }); + }); + + queue.on('execution-released', (executionId) => { + this.logger.debug('Execution released', { executionId, type }); + }); }); } /** - * Check whether an execution is in the production queue. + * Check whether an execution is in any of the queues. */ has(executionId: string) { if (!this.isEnabled) return false; - return this.productionQueue.getAll().has(executionId); + for (const queue of this.queues.values()) { + if (queue.has(executionId)) { + return true; + } + } + + return false; } /** @@ -89,16 +114,16 @@ export class ConcurrencyControlService { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - await this.productionQueue.enqueue(executionId); + await this.getQueue(mode)?.enqueue(executionId); } /** - * Release capacity back so the next execution in the production queue can proceed. + * Release capacity back so the next execution in the queue can proceed. */ release({ mode }: { mode: ExecutionMode }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.dequeue(); + this.getQueue(mode)?.dequeue(); } /** @@ -107,7 +132,7 @@ export class ConcurrencyControlService { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - this.productionQueue.remove(executionId); + this.getQueue(mode)?.remove(executionId); } /** @@ -118,11 +143,13 @@ export class ConcurrencyControlService { async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { if (!this.isEnabled) return; - const enqueuedProductionIds = this.productionQueue.getAll(); + this.queues.forEach((queue) => { + const enqueuedExecutionIds = queue.getAll(); - for (const id of enqueuedProductionIds) { - this.productionQueue.remove(id); - } + for (const id of enqueuedExecutionIds) { + queue.remove(id); + } + }); const executionIds = Object.entries(activeExecutions) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) @@ -146,15 +173,28 @@ export class ConcurrencyControlService { private logInit() { this.logger.debug('Enabled'); - this.logger.debug( - [ - 'Production execution concurrency is', - this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), - ].join(' '), - ); + this.limits.forEach((limit, type) => { + this.logger.debug( + [ + `${capitalize(type)} execution concurrency is`, + limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(), + ].join(' '), + ); + }); } private isUnlimited(mode: ExecutionMode) { + return this.getQueue(mode) === undefined; + } + + private shouldReport(capacity: number) { + return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); + } + + /** + * Get the concurrency queue based on the execution mode. + */ + private getQueue(mode: ExecutionMode) { if ( mode === 'error' || mode === 'integrated' || @@ -163,15 +203,13 @@ export class ConcurrencyControlService { mode === 'manual' || mode === 'retry' ) { - return true; + return undefined; } - if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production'); - throw new UnknownExecutionModeError(mode); - } + if (mode === 'evaluation') return this.queues.get('evaluation'); - private shouldReport(capacity: number) { - return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); + throw new UnknownExecutionModeError(mode); } } diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 900018889aab8..eac9e478d106a 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -58,6 +58,10 @@ export class ConcurrencyQueue extends TypedEmitter { return new Set(this.queue.map((item) => item.executionId)); } + has(executionId: string) { + return this.queue.some((item) => item.executionId === executionId); + } + private resolveNext() { const item = this.queue.shift(); diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 15c3a59969789..0e7f747fba3aa 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -35,6 +35,12 @@ export const schema = { default: -1, env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', }, + evaluationLimit: { + doc: 'Max evaluation executions allowed to run concurrently.', + format: Number, + default: -1, + env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', + }, }, // A Workflow times out and gets canceled after this time (seconds). diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index 4727c8ef722fa..17295092106aa 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -934,6 +934,7 @@ describe('LogStreamingEventRelay', () => { it('should log on `execution-throttled` event', () => { const event: RelayEventMap['execution-throttled'] = { executionId: 'exec123456', + type: 'production', }; eventService.emit('execution-throttled', event); @@ -942,6 +943,7 @@ describe('LogStreamingEventRelay', () => { eventName: 'n8n.execution.throttled', payload: { executionId: 'exec123456', + type: 'production', }, }); }); diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index 0b21454f3bcdd..b174c66587404 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -6,6 +6,7 @@ import type { IWorkflowExecutionDataProcess, } from 'n8n-workflow'; +import type { ConcurrencyQueueType } from '@/concurrency/concurrency-control.service'; import type { AuthProviderType } from '@/databases/entities/auth-identity'; import type { GlobalRole, User } from '@/databases/entities/user'; import type { IWorkflowDb } from '@/interfaces'; @@ -337,6 +338,7 @@ export type RelayEventMap = { 'execution-throttled': { executionId: string; + type: ConcurrencyQueueType; }; 'execution-started-during-bootup': { diff --git a/packages/cli/src/events/relays/log-streaming.event-relay.ts b/packages/cli/src/events/relays/log-streaming.event-relay.ts index b048b09a8303d..76b578451dd5e 100644 --- a/packages/cli/src/events/relays/log-streaming.event-relay.ts +++ b/packages/cli/src/events/relays/log-streaming.event-relay.ts @@ -385,10 +385,10 @@ export class LogStreamingEventRelay extends EventRelay { // #region Execution - private executionThrottled({ executionId }: RelayEventMap['execution-throttled']) { + private executionThrottled({ executionId, type }: RelayEventMap['execution-throttled']) { void this.eventBus.sendExecutionEvent({ eventName: 'n8n.execution.throttled', - payload: { executionId }, + payload: { executionId, type }, }); } diff --git a/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue b/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue index 0cf3c220ffd35..6c4b6915078c6 100644 --- a/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue +++ b/packages/editor-ui/src/components/executions/ConcurrentExecutionsHeader.vue @@ -1,6 +1,8 @@