diff --git a/package.json b/package.json index 735e215..3ec77b3 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,6 @@ "@semantic-release/git": "^10.0.1", "@semantic-release/github": "^11.0.1", "@semantic-release/release-notes-generator": "^11.0.7", - "@types/amqplib": "^0.10.7", "@types/jest": "^30.0.0", "@types/node": "^22.0.0", "eslint": "^9.29.0", @@ -69,7 +68,7 @@ }, "dependencies": { "ajv": "^8.17.1", - "amqplib": "^0.10.8" + "rabbitmq-client": "^5.0.0" }, "release": { "branches": [ diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts index f752366..f7a302c 100644 --- a/src/core/RunMQ.ts +++ b/src/core/RunMQ.ts @@ -1,10 +1,9 @@ -import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunMQMessageContent} from "@src/types"; +import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunMQMessageContent, AMQPChannel} from "@src/types"; import {RunMQException} from "@src/core/exceptions/RunMQException"; -import {AmqplibClient} from "@src/core/clients/AmqplibClient"; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; import {Exceptions} from "@src/core/exceptions/Exceptions"; import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {Constants, DEFAULTS} from "@src/core/constants"; -import {Channel} from "amqplib"; import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator"; import {ConsumerConfiguration} from "@src/core/consumer/ConsumerConfiguration"; import {RunMQLogger} from "@src/core/logging/RunMQLogger"; @@ -14,12 +13,12 @@ import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties"; export class RunMQ { - private readonly amqplibClient: AmqplibClient; + private readonly client: RabbitMQClientAdapter; private readonly config: RunMQConnectionConfig; private publisher: RunMQPublisher | undefined private readonly logger: RunMQLogger private retryAttempts: number = 0; - private defaultChannel: Channel | undefined; + private defaultChannel: AMQPChannel | undefined; private constructor(config: RunMQConnectionConfig, logger: RunMQLogger) { this.logger = logger; @@ -28,7 +27,7 @@ export class RunMQ { reconnectDelay: config.reconnectDelay ?? DEFAULTS.RECONNECT_DELAY, maxReconnectAttempts: config.maxReconnectAttempts ?? DEFAULTS.MAX_RECONNECT_ATTEMPTS, }; - this.amqplibClient = new AmqplibClient(this.config); + this.client = new RabbitMQClientAdapter(this.config); } /** @@ -51,7 +50,7 @@ export class RunMQ { * @param processor The function that will process the incoming messages */ public async process>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent) => Promise) { - const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger, this.config.management); + const consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management); await consumer.createConsumer(new ConsumerConfiguration(topic, config, processor)) } @@ -62,14 +61,14 @@ export class RunMQ { * @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated */ public publish(topic: string, message: Record, correlationId: string = RunMQUtils.generateUUID()): void { - if (!this.publisher) { + if (!this.publisher || !this.defaultChannel) { throw new RunMQException(Exceptions.NOT_INITIALIZED, {}); } RunMQUtils.assertRecord(message); this.publisher.publish(topic, RabbitMQMessage.from( message, - this.defaultChannel!, + this.defaultChannel, new RabbitMQMessageProperties(RunMQUtils.generateUUID(), correlationId) ) ); @@ -85,7 +84,7 @@ export class RunMQ { */ public async disconnect(): Promise { try { - await this.amqplibClient.disconnect(); + await this.client.disconnect(); } catch (error) { throw new RunMQException( Exceptions.CONNECTION_NOT_ESTABLISHED, @@ -100,7 +99,7 @@ export class RunMQ { * Checks if the connection is currently active. */ public isActive(): boolean { - return this.amqplibClient.isActive(); + return this.client.isActive(); } private async connectWithRetry(): Promise { @@ -109,7 +108,7 @@ export class RunMQ { while (this.retryAttempts < maxAttempts) { try { - await this.amqplibClient.connect(); + await this.client.connect(); this.logger.log('Successfully connected to RabbitMQ'); this.retryAttempts = 0; return; @@ -134,7 +133,7 @@ export class RunMQ { } private async initialize(): Promise { - this.defaultChannel = await this.amqplibClient.getChannel(); + this.defaultChannel = await this.client.getDefaultChannel(); await this.defaultChannel.assertExchange(Constants.ROUTER_EXCHANGE_NAME, 'direct', {durable: true}); await this.defaultChannel.assertExchange(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, 'direct', {durable: true}); this.publisher = new RunMQPublisherCreator(this.logger).createPublisher(); diff --git a/src/core/clients/AmqplibClient.ts b/src/core/clients/AmqplibClient.ts deleted file mode 100644 index dd2b4bd..0000000 --- a/src/core/clients/AmqplibClient.ts +++ /dev/null @@ -1,67 +0,0 @@ -import * as amqp from "amqplib"; -import {RunMQException} from "@src/core/exceptions/RunMQException"; -import {Exceptions} from "@src/core/exceptions/Exceptions"; -import {Channel, ChannelModel} from "amqplib"; -import {AMQPClient, RunMQConnectionConfig} from "@src/types"; - -export class AmqplibClient implements AMQPClient { - private channelModel: ChannelModel | undefined; - private isConnected: boolean = false; - - constructor(private config: RunMQConnectionConfig) { - this.config = config - } - - public async connect(): Promise { - try { - if (this.isConnected && this.channelModel) { - return this.channelModel; - } - - this.channelModel = await amqp.connect(this.config.url); - this.isConnected = true; - - if (this.isConnected) { - this.channelModel.on('error', () => { - // TODO:: handle error (reconnect logic?) - this.isConnected = false; - }); - - this.channelModel.on('close', () => { - this.isConnected = false; - }); - } - return this.channelModel; - } catch (error) { - this.isConnected = false; - throw new RunMQException( - Exceptions.CONNECTION_NOT_ESTABLISHED, - { - error: error instanceof Error ? error.message : JSON.stringify(error) - } - ); - } - } - public async getChannel(): Promise { - return await (await this.connect()).createChannel() - } - public async disconnect(): Promise { - try { - if (this.channelModel && this.isConnected) { - await this.channelModel.close(); - this.isConnected = false; - } - } catch (error) { - throw new RunMQException( - Exceptions.CONNECTION_NOT_ESTABLISHED, - { - error: error instanceof Error ? error.message : String(error) - } - ); - } - } - - public isActive(): boolean { - return this.isConnected && this.channelModel !== undefined; - } -} \ No newline at end of file diff --git a/src/core/clients/RabbitMQClientAdapter.ts b/src/core/clients/RabbitMQClientAdapter.ts new file mode 100644 index 0000000..766fa40 --- /dev/null +++ b/src/core/clients/RabbitMQClientAdapter.ts @@ -0,0 +1,137 @@ +import {Connection, Channel} from "rabbitmq-client"; +import {RunMQException} from "@src/core/exceptions/RunMQException"; +import {Exceptions} from "@src/core/exceptions/Exceptions"; +import {AMQPChannel, AMQPClient, RunMQConnectionConfig} from "@src/types"; +import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel"; + +/** + * AMQPClient implementation using rabbitmq-client library. + * Leverages built-in reconnection, retry, and robustness features. + */ +export class RabbitMQClientAdapter implements AMQPClient { + private connection: Connection | undefined; + private defaultChannel: AMQPChannel | undefined; + private isConnected: boolean = false; + private acquiredChannels: Channel[] = []; + + constructor(private config: RunMQConnectionConfig) {} + + public async connect(): Promise { + try { + if (this.connection && this.isConnected) { + return this.connection; + } + + // Close any existing connection that might be in a bad state + if (this.connection) { + try { + await this.connection.close(); + } catch { + // Ignore close errors + } + this.connection = undefined; + } + + this.connection = new Connection({ + url: this.config.url, + // Disable automatic retries - we handle retries at RunMQ level + retryLow: 100, + retryHigh: 200, + connectionTimeout: 5000, + }); + + // Set up event handlers before waiting for connection + this.connection.on('error', (err) => { + console.error('RabbitMQ connection error:', err); + this.isConnected = false; + }); + + this.connection.on('connection', () => { + this.isConnected = true; + }); + + this.connection.on('connection.blocked', (reason) => { + console.warn('RabbitMQ connection blocked:', reason); + }); + + this.connection.on('connection.unblocked', () => { + console.info('RabbitMQ connection unblocked'); + }); + + // Wait for connection with timeout + // The second parameter (true) disables auto-close on timeout + await this.connection.onConnect(5000, true); + this.isConnected = true; + + return this.connection; + } catch (error) { + this.isConnected = false; + // Clean up the connection on failure + if (this.connection) { + try { + this.connection.close(); + } catch { + // Ignore + } + this.connection = undefined; + } + throw new RunMQException( + Exceptions.CONNECTION_NOT_ESTABLISHED, + { + error: error instanceof Error ? error.message : JSON.stringify(error) + } + ); + } + } + + public async getChannel(): Promise { + const connection = await this.connect(); + const rawChannel = await connection.acquire(); + // Track the channel so we can close it on disconnect + this.acquiredChannels.push(rawChannel); + return new RabbitMQClientChannel(rawChannel); + } + + public async getDefaultChannel(): Promise { + if (!this.defaultChannel) { + this.defaultChannel = await this.getChannel(); + } + return this.defaultChannel; + } + + public async disconnect(): Promise { + // Reset state first + const conn = this.connection; + const channels = this.acquiredChannels; + + this.connection = undefined; + this.defaultChannel = undefined; + this.isConnected = false; + this.acquiredChannels = []; + + // Close all acquired channels first + for (const channel of channels) { + try { + if (channel.active) { + await channel.close(); + } + } catch { + // Ignore errors - channel might already be closed + } + } + + // Now close the connection + if (conn) { + try { + await conn.close(); + } catch { + // Ignore errors - connection might already be closed + } + } + } + + public isActive(): boolean { + return this.connection !== undefined && this.isConnected; + } +} + diff --git a/src/core/clients/RabbitMQClientChannel.ts b/src/core/clients/RabbitMQClientChannel.ts new file mode 100644 index 0000000..c710309 --- /dev/null +++ b/src/core/clients/RabbitMQClientChannel.ts @@ -0,0 +1,207 @@ +import type {Channel, AsyncMessage} from "rabbitmq-client"; +import { + AMQPChannel, + AMQPConsumeInfo, + AMQPConsumeOptions, + AMQPExchangeInfo, + AMQPExchangeOptions, + AMQPPublishOptions, + AMQPQueueInfo, + AMQPQueueOptions, + ConsumeMessage +} from "@src/types"; + +/** + * Wrapper around rabbitmq-client Channel that implements the AMQPChannel interface. + * Provides a library-agnostic abstraction over channel operations. + */ +export class RabbitMQClientChannel implements AMQPChannel { + constructor(private readonly channel: Channel) {} + + async assertQueue(queue: string, options?: AMQPQueueOptions): Promise { + const args: Record = {}; + if (options?.deadLetterExchange) args['x-dead-letter-exchange'] = options.deadLetterExchange; + if (options?.deadLetterRoutingKey) args['x-dead-letter-routing-key'] = options.deadLetterRoutingKey; + if (options?.messageTtl) args['x-message-ttl'] = options.messageTtl; + if (options?.arguments) Object.assign(args, options.arguments); + + const result = await this.channel.queueDeclare({ + queue, + durable: options?.durable, + exclusive: options?.exclusive, + autoDelete: options?.autoDelete, + arguments: Object.keys(args).length > 0 ? args : undefined, + }); + return { + queue: result.queue, + messageCount: result.messageCount, + consumerCount: result.consumerCount, + }; + } + + async checkQueue(queue: string): Promise { + const result = await this.channel.queueDeclare({ + queue, + passive: true, + }); + return { + queue: result.queue, + messageCount: result.messageCount, + consumerCount: result.consumerCount, + }; + } + + async deleteQueue(queue: string, options?: { ifUnused?: boolean; ifEmpty?: boolean }): Promise<{ messageCount: number }> { + const result = await this.channel.queueDelete({ + queue, + ifUnused: options?.ifUnused, + ifEmpty: options?.ifEmpty, + }); + return { + messageCount: result.messageCount, + }; + } + + async assertExchange(exchange: string, type: string, options?: AMQPExchangeOptions): Promise { + const args: Record = {}; + if (options?.alternateExchange) args['alternate-exchange'] = options.alternateExchange; + if (options?.arguments) Object.assign(args, options.arguments); + + await this.channel.exchangeDeclare({ + exchange, + type, + durable: options?.durable, + internal: options?.internal, + autoDelete: options?.autoDelete, + arguments: Object.keys(args).length > 0 ? args : undefined, + }); + return { + exchange, + }; + } + + async checkExchange(exchange: string): Promise { + await this.channel.exchangeDeclare({ + exchange, + passive: true, + }); + return { + exchange, + }; + } + + async deleteExchange(exchange: string, options?: { ifUnused?: boolean }): Promise { + await this.channel.exchangeDelete({ + exchange, + ifUnused: options?.ifUnused, + }); + } + + async bindQueue(queue: string, source: string, pattern: string, args?: Record): Promise { + await this.channel.queueBind({ + queue, + exchange: source, + routingKey: pattern, + arguments: args, + }); + } + + publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean { + this.channel.basicPublish({ + exchange, + routingKey, + correlationId: options?.correlationId, + messageId: options?.messageId, + headers: options?.headers, + durable: options?.persistent, + expiration: options?.expiration?.toString(), + contentType: options?.contentType, + contentEncoding: options?.contentEncoding, + priority: options?.priority, + replyTo: options?.replyTo, + timestamp: options?.timestamp, + type: options?.type, + userId: options?.userId, + appId: options?.appId, + }, content); + return true; + } + + async consume( + queue: string, + onMessage: (msg: ConsumeMessage | null) => void, + options?: AMQPConsumeOptions + ): Promise { + const result = await this.channel.basicConsume({ + queue, + consumerTag: options?.consumerTag, + noLocal: options?.noLocal, + noAck: options?.noAck, + exclusive: options?.exclusive, + arguments: options?.arguments, + }, (msg: AsyncMessage) => { + // Convert rabbitmq-client message format to our ConsumeMessage format + const body = msg.body; + const content = Buffer.isBuffer(body) ? body : + typeof body === 'string' ? Buffer.from(body) : + Buffer.from(JSON.stringify(body)); + + const consumeMessage: ConsumeMessage = { + content, + fields: { + consumerTag: msg.consumerTag, + deliveryTag: msg.deliveryTag, + redelivered: msg.redelivered, + exchange: msg.exchange, + routingKey: msg.routingKey, + }, + properties: { + contentType: msg.contentType, + contentEncoding: msg.contentEncoding, + headers: msg.headers || {}, + deliveryMode: msg.durable ? 2 : 1, + priority: msg.priority, + correlationId: msg.correlationId, + replyTo: msg.replyTo, + expiration: msg.expiration, + messageId: msg.messageId, + timestamp: msg.timestamp, + type: msg.type, + userId: msg.userId, + appId: msg.appId, + }, + }; + onMessage(consumeMessage); + }); + return { + consumerTag: result.consumerTag, + }; + } + + ack(message: ConsumeMessage, allUpTo?: boolean): void { + this.channel.basicAck({ + deliveryTag: message.fields.deliveryTag, + multiple: allUpTo, + }); + } + + nack(message: ConsumeMessage, allUpTo?: boolean, requeue?: boolean): void { + this.channel.basicNack({ + deliveryTag: message.fields.deliveryTag, + multiple: allUpTo, + requeue, + }); + } + + async prefetch(count: number, global?: boolean): Promise { + await this.channel.basicQos({ + prefetchCount: count, + global, + }); + } + + async close(): Promise { + await this.channel.close(); + } +} + diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index 047bf4d..224301f 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -1,4 +1,3 @@ -import {Channel} from "amqplib"; import {ConsumerConfiguration} from "@src/core/consumer/ConsumerConfiguration"; import {Constants, DEFAULTS} from "@src/core/constants"; import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; @@ -14,7 +13,7 @@ import {RunMQLogger} from "@src/core/logging/RunMQLogger"; import {DefaultDeserializer} from "@src/core/serializers/deserializer/DefaultDeserializer"; import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator"; -import {AMQPClient, RabbitMQManagementConfig} from "@src/types"; +import {AMQPChannel, AMQPClient, RabbitMQManagementConfig} from "@src/types"; import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; import {RunMQException} from "@src/core/exceptions/RunMQException"; import {Exceptions} from "@src/core/exceptions/Exceptions"; @@ -23,7 +22,6 @@ export class RunMQConsumerCreator { private ttlPolicyManager: RunMQTTLPolicyManager; constructor( - private defaultChannel: Channel, private client: AMQPClient, private logger: RunMQLogger, managementConfig?: RabbitMQManagementConfig @@ -80,12 +78,14 @@ export class RunMQConsumerCreator { private async assertQueues(consumerConfiguration: ConsumerConfiguration) { - await this.defaultChannel.assertQueue(consumerConfiguration.processorConfig.name, { + const defaultChannel = await this.client.getDefaultChannel(); + + await defaultChannel.assertQueue(consumerConfiguration.processorConfig.name, { durable: true, deadLetterExchange: Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, deadLetterRoutingKey: consumerConfiguration.processorConfig.name }); - await this.defaultChannel.assertQueue(ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name), { + await defaultChannel.assertQueue(ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name), { durable: true, deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME, deadLetterRoutingKey: consumerConfiguration.processorConfig.name @@ -97,7 +97,7 @@ export class RunMQConsumerCreator { const policiesForTTL = consumerConfiguration.processorConfig.usePoliciesForDelay ?? false; if (!policiesForTTL) { - await this.defaultChannel.assertQueue(retryDelayQueueName, { + await defaultChannel.assertQueue(retryDelayQueueName, { durable: true, deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME, messageTtl: messageDelay, @@ -110,7 +110,7 @@ export class RunMQConsumerCreator { messageDelay ); if (result) { - await this.defaultChannel.assertQueue(retryDelayQueueName, { + await defaultChannel.assertQueue(retryDelayQueueName, { durable: true, deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME }); @@ -126,29 +126,31 @@ export class RunMQConsumerCreator { private async bindQueues(consumerConfiguration: ConsumerConfiguration) { - await this.defaultChannel.bindQueue( + const defaultChannel = await this.client.getDefaultChannel(); + + await defaultChannel.bindQueue( consumerConfiguration.processorConfig.name, Constants.ROUTER_EXCHANGE_NAME, consumerConfiguration.topic ); - await this.defaultChannel.bindQueue( + await defaultChannel.bindQueue( consumerConfiguration.processorConfig.name, Constants.ROUTER_EXCHANGE_NAME, consumerConfiguration.processorConfig.name ); - await this.defaultChannel.bindQueue( + await defaultChannel.bindQueue( ConsumerCreatorUtils.getRetryDelayTopicName(consumerConfiguration.processorConfig.name), Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, consumerConfiguration.processorConfig.name ); - await this.defaultChannel.bindQueue( + await defaultChannel.bindQueue( ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name), Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name) ); } - private async getProcessorChannel(): Promise { + private async getProcessorChannel(): Promise { return await this.client.getChannel() } } \ No newline at end of file diff --git a/src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts b/src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts index 3d72191..3b1d6d8 100644 --- a/src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts +++ b/src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts @@ -9,7 +9,7 @@ export class RunMQFailedMessageRejecterProcessor implements RunMQConsumer { try { return await this.consumer.consume(message); } catch { - message.channel.nack(message.amqpMessage!, false, false); + message.nack(false); return false; } } diff --git a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts index 059693a..7fa0e66 100644 --- a/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts +++ b/src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts @@ -50,7 +50,7 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer { private acknowledgeMessage(message: RabbitMQMessage) { try { - message.channel.ack(message.amqpMessage!, false); + message.ack(); } catch (e) { const error = new Error("A message acknowledge failed after publishing to final dead letter"); this.logger.error(error.message, {cause: e instanceof Error ? e.message : String(e)}); diff --git a/src/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.ts b/src/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.ts index fd9a0b5..c034c0f 100644 --- a/src/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.ts +++ b/src/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.ts @@ -8,7 +8,7 @@ export class RunMQSucceededMessageAcknowledgerProcessor implements RunMQConsumer public async consume(message: RabbitMQMessage) { const result = await this.consumer.consume(message); if (result) { - message.channel.ack(message.amqpMessage!) + message.ack(); } return result; } diff --git a/src/core/message/AmqpMessage.ts b/src/core/message/AmqpMessage.ts index b0a017a..c97844b 100644 --- a/src/core/message/AmqpMessage.ts +++ b/src/core/message/AmqpMessage.ts @@ -1 +1,4 @@ -export type AMQPMessage = import('amqplib').Message | null; \ No newline at end of file +import {ConsumeMessage} from "@src/types"; + +export type AMQPMessage = ConsumeMessage | null; + diff --git a/src/core/message/RabbitMQMessage.ts b/src/core/message/RabbitMQMessage.ts index 37335bd..e837293 100644 --- a/src/core/message/RabbitMQMessage.ts +++ b/src/core/message/RabbitMQMessage.ts @@ -1,21 +1,40 @@ -import {Channel} from "amqplib"; import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties"; import {AMQPMessage} from "@src/core/message/AmqpMessage"; +import {AMQPChannel} from "@src/types"; export class RabbitMQMessage { constructor( readonly message: any, readonly id: string = RunMQUtils.generateUUID(), readonly correlationId: string = RunMQUtils.generateUUID(), - readonly channel: Channel, + readonly channel: AMQPChannel, readonly amqpMessage: AMQPMessage = null, readonly headers: Record = {}) { } + /** + * Acknowledges the message. + */ + ack(): void { + if (this.amqpMessage) { + this.channel.ack(this.amqpMessage); + } + } + + /** + * Negatively acknowledges the message. + * @param requeue - Whether to requeue the message (default: false) + */ + nack(requeue: boolean = false): void { + if (this.amqpMessage) { + this.channel.nack(this.amqpMessage, false, requeue); + } + } + static from( messageData: Record, - channel: Channel, + channel: AMQPChannel, props: RabbitMQMessageProperties, amqpMessage: AMQPMessage = null ): RabbitMQMessage { diff --git a/src/types/index.ts b/src/types/index.ts index 37ceff7..15c0c5f 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,14 +1,194 @@ import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; -import {Channel, ChannelModel} from "amqplib"; + +/** + * Represents a message consumed from a queue. + * Library-agnostic format compatible with both amqplib and rabbitmq-client. + */ +export interface ConsumeMessage { + content: Buffer; + fields: { + consumerTag: string; + deliveryTag: number; + redelivered: boolean; + exchange: string; + routingKey: string; + }; + properties: { + contentType?: string; + contentEncoding?: string; + headers?: Record; + deliveryMode?: number; + priority?: number; + correlationId?: string; + replyTo?: string; + expiration?: string; + messageId?: string; + timestamp?: number; + type?: string; + userId?: string; + appId?: string; + }; +} + +/** + * Options for asserting a queue. + */ +export interface AMQPQueueOptions { + durable?: boolean; + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + messageTtl?: number; + exclusive?: boolean; + autoDelete?: boolean; + arguments?: Record; +} + +/** + * Options for asserting an exchange. + */ +export interface AMQPExchangeOptions { + durable?: boolean; + internal?: boolean; + autoDelete?: boolean; + alternateExchange?: string; + arguments?: Record; +} + +/** + * Options for publishing a message. + */ +export interface AMQPPublishOptions { + correlationId?: string; + messageId?: string; + headers?: Record; + persistent?: boolean; + expiration?: string | number; + contentType?: string; + contentEncoding?: string; + priority?: number; + replyTo?: string; + timestamp?: number; + type?: string; + userId?: string; + appId?: string; +} + +/** + * Options for consuming messages. + */ +export interface AMQPConsumeOptions { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: Record; +} + +/** + * Result of asserting a queue. + */ +export interface AMQPQueueInfo { + queue: string; + messageCount: number; + consumerCount: number; +} + +/** + * Result of asserting an exchange. + */ +export interface AMQPExchangeInfo { + exchange: string; +} + +/** + * Result of starting a consumer. + */ +export interface AMQPConsumeInfo { + consumerTag: string; +} + +/** + * Abstraction over AMQP channel operations. + * Decouples the application from specific AMQP client libraries (e.g., amqplib). + */ +export interface AMQPChannel { + /** + * Asserts a queue exists, creating it if necessary. + */ + assertQueue(queue: string, options?: AMQPQueueOptions): Promise; + + /** + * Checks if a queue exists. + */ + checkQueue(queue: string): Promise; + + /** + * Deletes a queue. + */ + deleteQueue(queue: string, options?: { ifUnused?: boolean; ifEmpty?: boolean }): Promise<{ messageCount: number }>; + + /** + * Asserts an exchange exists, creating it if necessary. + */ + assertExchange(exchange: string, type: string, options?: AMQPExchangeOptions): Promise; + + /** + * Checks if an exchange exists. + */ + checkExchange(exchange: string): Promise; + + /** + * Deletes an exchange. + */ + deleteExchange(exchange: string, options?: { ifUnused?: boolean }): Promise; + + /** + * Binds a queue to an exchange with a routing pattern. + */ + bindQueue(queue: string, source: string, pattern: string, args?: Record): Promise; + + /** + * Publishes a message to an exchange. + */ + publish(exchange: string, routingKey: string, content: Buffer, options?: AMQPPublishOptions): boolean; + + /** + * Starts consuming messages from a queue. + */ + consume(queue: string, onMessage: (msg: ConsumeMessage | null) => void, options?: AMQPConsumeOptions): Promise; + + /** + * Acknowledges a message. + */ + ack(message: ConsumeMessage, allUpTo?: boolean): void; + + /** + * Negatively acknowledges a message. + */ + nack(message: ConsumeMessage, allUpTo?: boolean, requeue?: boolean): void; + + /** + * Sets the prefetch count for the channel. + */ + prefetch(count: number, global?: boolean): Promise; + + /** + * Closes the channel. + */ + close(): Promise; +} export interface AMQPClient { - connect(): Promise; + connect(): Promise; + + getChannel(): Promise; - getChannel(): Promise + getDefaultChannel(): Promise; - disconnect(): Promise + disconnect(): Promise; - isActive(): boolean + isActive(): boolean; } export interface RunMQConnectionConfig { diff --git a/tests/e2e/AmqplibClient.e2e.test.ts b/tests/e2e/RabbitMQClientAdapter.e2e.test.ts similarity index 78% rename from tests/e2e/AmqplibClient.e2e.test.ts rename to tests/e2e/RabbitMQClientAdapter.e2e.test.ts index 8b3617b..09d3822 100644 --- a/tests/e2e/AmqplibClient.e2e.test.ts +++ b/tests/e2e/RabbitMQClientAdapter.e2e.test.ts @@ -1,21 +1,21 @@ -import {AmqplibClient} from '@src/core/clients/AmqplibClient'; +import {RabbitMQClientAdapter} from '@src/core/clients/RabbitMQClientAdapter'; import {Exceptions} from '@src/core/exceptions/Exceptions'; import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; -describe('AmqplibClient E2E Tests', () => { +describe('RabbitMQClientAdapter E2E Tests', () => { const validConfig = RunMQConnectionConfigExample.valid(); const invalidConfig = RunMQConnectionConfigExample.invalid(); describe('connection management', () => { it('should connect successfully to RabbitMQ', async () => { - const client = new AmqplibClient(validConfig); + const client = new RabbitMQClientAdapter(validConfig); await client.connect(); expect(client.isActive()).toBe(true); await client.disconnect(); }, 10000); it('should not reconnect if already connected', async () => { - const client = new AmqplibClient(validConfig); + const client = new RabbitMQClientAdapter(validConfig); await client.connect(); expect(client.isActive()).toBe(true); @@ -26,7 +26,7 @@ describe('AmqplibClient E2E Tests', () => { }, 10000); it('should throw RunMQException on invalid connection', async () => { - const client = new AmqplibClient(invalidConfig); + const client = new RabbitMQClientAdapter(invalidConfig); await expect(client.connect()).rejects.toMatchObject({ exception: Exceptions.CONNECTION_NOT_ESTABLISHED, @@ -37,7 +37,7 @@ describe('AmqplibClient E2E Tests', () => { }, 10000); it('should disconnect successfully', async () => { - const client = new AmqplibClient(validConfig); + const client = new RabbitMQClientAdapter(validConfig); await client.connect(); expect(client.isActive()).toBe(true); @@ -47,7 +47,7 @@ describe('AmqplibClient E2E Tests', () => { }, 10000); it('should handle multiple disconnects gracefully', async () => { - const client = new AmqplibClient(validConfig); + const client = new RabbitMQClientAdapter(validConfig); await client.connect(); await client.disconnect(); @@ -59,8 +59,8 @@ describe('AmqplibClient E2E Tests', () => { }); describe('channel management', () => { - it('should return different channel its time is requested', async () => { - const client = new AmqplibClient(validConfig); + it('should return different channel each time is requested', async () => { + const client = new RabbitMQClientAdapter(validConfig); await client.connect(); const channel = await client.getChannel(); const channel2 = await client.getChannel(); diff --git a/tests/e2e/RunMQ.e2e.test.ts b/tests/e2e/RunMQ.e2e.test.ts index 2bd1221..41e3f17 100644 --- a/tests/e2e/RunMQ.e2e.test.ts +++ b/tests/e2e/RunMQ.e2e.test.ts @@ -1,7 +1,7 @@ import {RunMQ} from '@src/core/RunMQ'; import {RunMQException} from '@src/core/exceptions/RunMQException'; import {Exceptions} from '@src/core/exceptions/Exceptions'; -import {AmqplibClient} from "@src/core/clients/AmqplibClient"; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; import {Constants} from "@src/core/constants"; import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; import {LoggerTestHelpers} from "@tests/helpers/LoggerTestHelpers"; @@ -42,7 +42,7 @@ describe('RunMQ E2E Tests', () => { attempts: invalidConfig.maxReconnectAttempts } }) - }, 20000); + }, 30000); it('should connect after temporary network issues', async () => { await expect(RunMQ.start(invalidConfig, MockedRunMQLogger)).rejects.toThrow(RunMQException); @@ -83,7 +83,7 @@ describe('RunMQ E2E Tests', () => { describe('Initialization', () => { it('Should create the default router exchange on initialization', async () => { const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await channel.checkExchange(Constants.ROUTER_EXCHANGE_NAME); await channel.deleteExchange(Constants.ROUTER_EXCHANGE_NAME); @@ -101,7 +101,7 @@ describe('RunMQ E2E Tests', () => { describe('processing', () => { it('Should end up in DLQ when message is not meeting the schema validation', async () => { const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); @@ -132,7 +132,7 @@ describe('RunMQ E2E Tests', () => { describe('publishing', () => { it('should publish and consume a message successfully', async () => { const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); diff --git a/tests/e2e/RunMQ.processing.e2e.test.ts b/tests/e2e/RunMQ.processing.e2e.test.ts index 91055ac..135fa43 100644 --- a/tests/e2e/RunMQ.processing.e2e.test.ts +++ b/tests/e2e/RunMQ.processing.e2e.test.ts @@ -1,5 +1,5 @@ import {RunMQ} from '@src/core/RunMQ'; -import {AmqplibClient} from "@src/core/clients/AmqplibClient"; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; import {Constants} from "@src/core/constants"; import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; @@ -19,7 +19,7 @@ describe('RunMQ E2E Tests', () => { jest.clearAllMocks(); }); - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); describe('processing behaviours', () => { it('Should process the message correctly given valid RunMQMessage structure without schema', async () => { diff --git a/tests/e2e/RunMQ.publisher.e2e.test.ts b/tests/e2e/RunMQ.publisher.e2e.test.ts index cf985c2..9d6e5db 100644 --- a/tests/e2e/RunMQ.publisher.e2e.test.ts +++ b/tests/e2e/RunMQ.publisher.e2e.test.ts @@ -1,5 +1,5 @@ import {RunMQ} from '@src/core/RunMQ'; -import {AmqplibClient} from "@src/core/clients/AmqplibClient"; +import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter"; import {Constants} from "@src/core/constants"; import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; @@ -19,7 +19,7 @@ describe('RunMQ Publisher E2E Tests', () => { describe('publish functionality', () => { it('should publish message successfully to the correct queue', async () => { - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); @@ -45,7 +45,7 @@ describe('RunMQ Publisher E2E Tests', () => { }, 15000); it('should publish multiple messages successfully', async () => { - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); @@ -73,7 +73,7 @@ describe('RunMQ Publisher E2E Tests', () => { }, 15000); it('should handle publishing to non-existent topic gracefully', async () => { - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); @@ -98,7 +98,7 @@ describe('RunMQ Publisher E2E Tests', () => { }, 15000); it('should publish message with proper RunMQMessage structure', async () => { - const testingConnection = new AmqplibClient(validConfig); + const testingConnection = new RabbitMQClientAdapter(validConfig); const channel = await testingConnection.getChannel(); await ChannelTestHelpers.deleteQueue(channel, configuration.name); diff --git a/tests/mocks/MockedAMQPChannel.ts b/tests/mocks/MockedAMQPChannel.ts new file mode 100644 index 0000000..e46936d --- /dev/null +++ b/tests/mocks/MockedAMQPChannel.ts @@ -0,0 +1,62 @@ +import { + AMQPChannel, + AMQPConsumeInfo, + AMQPConsumeOptions, + AMQPExchangeInfo, + AMQPExchangeOptions, + AMQPPublishOptions, + AMQPQueueInfo, + AMQPQueueOptions, + ConsumeMessage +} from "@src/types"; + +export class MockedAMQPChannel implements AMQPChannel { + assertQueue = jest.fn, [string, AMQPQueueOptions?]>().mockResolvedValue({ + queue: 'test-queue', + messageCount: 0, + consumerCount: 0 + }); + + checkQueue = jest.fn, [string]>().mockResolvedValue({ + queue: 'test-queue', + messageCount: 0, + consumerCount: 0 + }); + + deleteQueue = jest.fn, [string, { ifUnused?: boolean; ifEmpty?: boolean }?]>().mockResolvedValue({ + messageCount: 0 + }); + + assertExchange = jest.fn, [string, string, AMQPExchangeOptions?]>().mockResolvedValue({ + exchange: 'test-exchange' + }); + + checkExchange = jest.fn, [string]>().mockResolvedValue({ + exchange: 'test-exchange' + }); + + deleteExchange = jest.fn, [string, { ifUnused?: boolean }?]>().mockResolvedValue(); + + bindQueue = jest.fn, [string, string, string, Record?]>().mockResolvedValue(); + + publish = jest.fn().mockReturnValue(true); + + consume = jest.fn, [string, (msg: ConsumeMessage | null) => void, AMQPConsumeOptions?]>().mockResolvedValue({ + consumerTag: 'test-consumer-tag' + }); + + ack = jest.fn(); + + nack = jest.fn(); + + prefetch = jest.fn, [number, boolean?]>().mockResolvedValue(); + + close = jest.fn, []>().mockResolvedValue(); +} + +export class MockedAMQPChannelWithAcknowledgeFailure extends MockedAMQPChannel { + ack = jest.fn().mockImplementation(() => { + throw new Error("Acknowledgement failed"); + }); +} + diff --git a/tests/mocks/MockedAMQPClient.ts b/tests/mocks/MockedAMQPClient.ts index 8c6bde0..b9a8add 100644 --- a/tests/mocks/MockedAMQPClient.ts +++ b/tests/mocks/MockedAMQPClient.ts @@ -1,13 +1,13 @@ -import {AMQPClient} from "@src/types"; -import {Channel} from "amqplib"; +import {AMQPChannel, AMQPClient} from "@src/types"; export class MockedAMQPClient implements AMQPClient { - constructor(private channel: Channel) { + constructor(private channel: AMQPChannel) { } public connect = jest.fn(); - public getChannel = jest.fn().mockResolvedValue(this.channel) + public getChannel = jest.fn().mockResolvedValue(this.channel); + public getDefaultChannel = jest.fn().mockResolvedValue(this.channel); public disconnect = jest.fn(); public isActive = jest.fn(); } \ No newline at end of file diff --git a/tests/mocks/MockedAmqpMessage.ts b/tests/mocks/MockedAmqpMessage.ts index 6c36734..066b502 100644 --- a/tests/mocks/MockedAmqpMessage.ts +++ b/tests/mocks/MockedAmqpMessage.ts @@ -1,8 +1,14 @@ -import { Message } from "amqplib"; import jest from 'jest-mock'; +import {ConsumeMessage} from "@src/types"; export const MockedAmqpMessage = { content: Buffer.from('mocked message'), - fields: {}, + fields: { + consumerTag: 'test-consumer-tag', + deliveryTag: 1, + redelivered: false, + exchange: 'test-exchange', + routingKey: 'test-routing-key', + }, properties: {}, -} as unknown as jest.Mocked; \ No newline at end of file +} as unknown as jest.Mocked; diff --git a/tests/mocks/MockedRabbitMQMessage.ts b/tests/mocks/MockedRabbitMQMessage.ts index 86f2af1..8788e97 100644 --- a/tests/mocks/MockedRabbitMQMessage.ts +++ b/tests/mocks/MockedRabbitMQMessage.ts @@ -1,21 +1,21 @@ import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {MessageExample} from "@tests/Examples/MessageExample"; -import {MockedRabbitMQChannel} from "@tests/mocks/MockedRabbitMQChannel"; +import {MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel"; import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; import {MockedAmqpMessage} from "@tests/mocks/MockedAmqpMessage"; -import {Channel} from "amqplib"; +import {AMQPChannel} from "@src/types"; export const MockedRabbitMQMessage = new RabbitMQMessage( MessageExample.person(), RunMQUtils.generateUUID(), RunMQUtils.generateUUID(), - new MockedRabbitMQChannel(), + new MockedAMQPChannel(), MockedAmqpMessage, {} ) export function mockedRabbitMQMessageWithChannelAndMessage( - channel: MockedRabbitMQChannel, + channel: AMQPChannel, message: any, id: string, correlationId: string @@ -35,7 +35,7 @@ export function mockedRabbitMQMessageWithDeathCount(count: number) { MessageExample.person(), RunMQUtils.generateUUID(), RunMQUtils.generateUUID(), - new MockedRabbitMQChannel(), + new MockedAMQPChannel(), MockedAmqpMessage, { "x-death": [ @@ -49,7 +49,7 @@ export function mockedRabbitMQMessageWithDeathCount(count: number) { } -export function mockedRabbitMQMessageWithChannelAndDeathCount(channel: Channel, count: number) { +export function mockedRabbitMQMessageWithChannelAndDeathCount(channel: AMQPChannel, count: number) { return new RabbitMQMessage( MessageExample.person(), RunMQUtils.generateUUID(), diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts index cf69d1c..14b9e39 100644 --- a/tests/unit/core/RunMQ.test.ts +++ b/tests/unit/core/RunMQ.test.ts @@ -1,38 +1,39 @@ import {RunMQ} from '@src/core/RunMQ'; -import {AmqplibClient} from '@src/core/clients/AmqplibClient'; +import {RabbitMQClientAdapter} from '@src/core/clients/RabbitMQClientAdapter'; import {RunMQException} from '@src/core/exceptions/RunMQException'; import {Exceptions} from '@src/core/exceptions/Exceptions'; import {RunMQUtils} from '@src/core/utils/RunMQUtils'; import {RunMQConsumerCreator} from '@src/core/consumer/RunMQConsumerCreator'; -import {Channel} from 'amqplib'; import {Constants} from '@src/core/constants'; import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; -import {MockedRabbitMQChannel} from "@tests/mocks/MockedRabbitMQChannel"; +import {MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel"; import {MessageExample} from "@tests/Examples/MessageExample"; import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator"; +import {AMQPChannel} from "@src/types"; -jest.mock('@src/core/clients/AmqplibClient'); +jest.mock('@src/core/clients/RabbitMQClientAdapter'); jest.mock('@src/core/utils/RunMQUtils'); jest.mock('@src/core/consumer/RunMQConsumerCreator'); jest.mock('@src/core/publisher/RunMQPublisherCreator'); describe('RunMQ Unit Tests', () => { - const mockChannel = new MockedRabbitMQChannel(); + const mockChannel = new MockedAMQPChannel(); const validConfig = RunMQConnectionConfigExample.valid(); - const setupSuccessfulAmqplibClientMock = () => { - const mockAmqplibClient = AmqplibClient as jest.MockedClass; - mockAmqplibClient.prototype.connect.mockResolvedValue({} as any); - mockAmqplibClient.prototype.getChannel.mockResolvedValue(mockChannel as Channel); - return mockAmqplibClient; + const setupSuccessfulClientMock = () => { + const mockClient = RabbitMQClientAdapter as jest.MockedClass; + mockClient.prototype.connect.mockResolvedValue({} as any); + mockClient.prototype.getChannel.mockResolvedValue(mockChannel as AMQPChannel); + mockClient.prototype.getDefaultChannel.mockResolvedValue(mockChannel as AMQPChannel); + return mockClient; }; - const setupFailingAmqplibClientMock = (error: Error) => { - const mockAmqplibClient = AmqplibClient as jest.MockedClass; - mockAmqplibClient.prototype.connect.mockRejectedValue(error); - return mockAmqplibClient; + const setupFailingClientMock = (error: Error) => { + const mockClient = RabbitMQClientAdapter as jest.MockedClass; + mockClient.prototype.connect.mockRejectedValue(error); + return mockClient; }; const setupPublisherMock = () => { @@ -53,13 +54,13 @@ describe('RunMQ Unit Tests', () => { describe('start', () => { it('should create instance and connect successfully', async () => { - const mockAmqplibClient = setupSuccessfulAmqplibClientMock(); + const mockClient = setupSuccessfulClientMock(); await RunMQ.start(validConfig); - expect(mockAmqplibClient).toHaveBeenCalledWith(validConfig); - expect(mockAmqplibClient.prototype.connect).toHaveBeenCalled(); - expect(mockAmqplibClient.prototype.getChannel).toHaveBeenCalled(); + expect(mockClient).toHaveBeenCalledWith(validConfig); + expect(mockClient.prototype.connect).toHaveBeenCalled(); + expect(mockClient.prototype.getDefaultChannel).toHaveBeenCalled(); expect(mockChannel.assertExchange).toHaveBeenCalledWith( Constants.ROUTER_EXCHANGE_NAME, 'direct', @@ -74,11 +75,11 @@ describe('RunMQ Unit Tests', () => { it('should use default config values when not provided', async () => { const minimalConfig = {url: RunMQConnectionConfigExample.valid().url}; - const mockAmqplibClient = setupSuccessfulAmqplibClientMock(); + const mockClient = setupSuccessfulClientMock(); await RunMQ.start(minimalConfig); - expect(mockAmqplibClient).toHaveBeenCalledWith({ + expect(mockClient).toHaveBeenCalledWith({ ...minimalConfig, reconnectDelay: 5000, maxReconnectAttempts: 5 @@ -86,16 +87,17 @@ describe('RunMQ Unit Tests', () => { }); it('should retry connection on failure', async () => { - const mockAmqplibClient = AmqplibClient as jest.MockedClass; - mockAmqplibClient.prototype.connect + const mockClient = RabbitMQClientAdapter as jest.MockedClass; + mockClient.prototype.connect .mockRejectedValueOnce(new Error('Connection failed')) .mockRejectedValueOnce(new Error('Connection failed')) .mockResolvedValueOnce({} as any); - mockAmqplibClient.prototype.getChannel.mockResolvedValue(mockChannel as Channel); + mockClient.prototype.getChannel.mockResolvedValue(mockChannel as AMQPChannel); + mockClient.prototype.getDefaultChannel.mockResolvedValue(mockChannel as AMQPChannel); await RunMQ.start(validConfig, MockedRunMQLogger); - expect(mockAmqplibClient.prototype.connect).toHaveBeenCalledTimes(3); + expect(mockClient.prototype.connect).toHaveBeenCalledTimes(3); expect(RunMQUtils.delay).toHaveBeenCalledTimes(2); expect(RunMQUtils.delay).toHaveBeenCalledWith(100); expect(MockedRunMQLogger.error).toHaveBeenCalledTimes(4); @@ -103,7 +105,7 @@ describe('RunMQ Unit Tests', () => { }); it('should throw exception after max retry attempts', async () => { - setupFailingAmqplibClientMock(new Error('Connection failed')); + setupFailingClientMock(new Error('Connection failed')); await expect(RunMQ.start(validConfig)).rejects.toThrow(RunMQException); await expect(RunMQ.start(validConfig)).rejects.toMatchObject({ @@ -115,7 +117,7 @@ describe('RunMQ Unit Tests', () => { describe('process', () => { it('should create consumer with correct configuration', async () => { - setupSuccessfulAmqplibClientMock(); + setupSuccessfulClientMock(); const mockConsumerCreator = setupConsumerMock(); const runMQ = await RunMQ.start(validConfig); @@ -125,8 +127,7 @@ describe('RunMQ Unit Tests', () => { await runMQ.process('test.topic', processorConfig, processor); expect(mockConsumerCreator).toHaveBeenCalledWith( - mockChannel, - expect.any(AmqplibClient), + expect.any(RabbitMQClientAdapter), expect.any(Object), undefined ); @@ -142,7 +143,7 @@ describe('RunMQ Unit Tests', () => { describe('producer', () => { it('should throw error if message is not a valid record', async () => { - setupSuccessfulAmqplibClientMock(); + setupSuccessfulClientMock(); const runMQ = await RunMQ.start(validConfig); expect(() => { @@ -151,7 +152,7 @@ describe('RunMQ Unit Tests', () => { }); it('should publish message correctly if valid record', async () => { - setupSuccessfulAmqplibClientMock(); + setupSuccessfulClientMock(); const {mockPublisher} = setupPublisherMock(); const runMQ = await RunMQ.start(validConfig); @@ -163,18 +164,18 @@ describe('RunMQ Unit Tests', () => { describe('disconnect', () => { it('should disconnect successfully', async () => { - const mockAmqplibClient = setupSuccessfulAmqplibClientMock(); - mockAmqplibClient.prototype.disconnect.mockResolvedValue(); + const mockClient = setupSuccessfulClientMock(); + mockClient.prototype.disconnect.mockResolvedValue(); const runMQ = await RunMQ.start(validConfig); await runMQ.disconnect(); - expect(mockAmqplibClient.prototype.disconnect).toHaveBeenCalled(); + expect(mockClient.prototype.disconnect).toHaveBeenCalled(); }); it('should throw exception on disconnect error', async () => { - const mockAmqplibClient = setupSuccessfulAmqplibClientMock(); - mockAmqplibClient.prototype.disconnect.mockRejectedValue(new Error('Disconnect failed')); + const mockClient = setupSuccessfulClientMock(); + mockClient.prototype.disconnect.mockRejectedValue(new Error('Disconnect failed')); const runMQ = await RunMQ.start(validConfig); @@ -188,14 +189,14 @@ describe('RunMQ Unit Tests', () => { describe('isActive', () => { it('should return client active status', async () => { - const mockAmqplibClient = setupSuccessfulAmqplibClientMock(); - mockAmqplibClient.prototype.isActive.mockReturnValue(true); + const mockClient = setupSuccessfulClientMock(); + mockClient.prototype.isActive.mockReturnValue(true); const runMQ = await RunMQ.start(validConfig); const isActive = runMQ.isActive(); expect(isActive).toBe(true); - expect(mockAmqplibClient.prototype.isActive).toHaveBeenCalled(); + expect(mockClient.prototype.isActive).toHaveBeenCalled(); }); }); }); \ No newline at end of file diff --git a/tests/unit/core/clients/AmqplibClient.test.ts b/tests/unit/core/clients/AmqplibClient.test.ts deleted file mode 100644 index 2c2f774..0000000 --- a/tests/unit/core/clients/AmqplibClient.test.ts +++ /dev/null @@ -1,175 +0,0 @@ -import * as amqp from 'amqplib'; -import {AmqplibClient} from '@src/core/clients/AmqplibClient'; -import {RunMQException} from '@src/core/exceptions/RunMQException'; -import {Exceptions} from '@src/core/exceptions/Exceptions'; -import {Channel, ChannelModel} from 'amqplib'; - -jest.mock('amqplib'); - -describe('AmqplibClient Unit Tests', () => { - const validConfig = { - url: 'amqp://test:test@localhost:5672' - }; - - let mockConnection: Partial; - let mockChannel: Partial; - - beforeEach(() => { - jest.clearAllMocks(); - - mockChannel = { - close: jest.fn() - }; - - mockConnection = { - createChannel: jest.fn().mockResolvedValue(mockChannel), - close: jest.fn().mockResolvedValue(undefined), - on: jest.fn() - }; - - (amqp.connect as jest.Mock).mockResolvedValue(mockConnection); - }); - - describe('connect', () => { - it('should connect successfully and set up event handlers', async () => { - const client = new AmqplibClient(validConfig); - const connection = await client.connect(); - - expect(amqp.connect).toHaveBeenCalledWith(validConfig.url); - expect(connection).toBe(mockConnection); - expect(client.isActive()).toBe(true); - expect(mockConnection.on).toHaveBeenCalledWith('error', expect.any(Function)); - expect(mockConnection.on).toHaveBeenCalledWith('close', expect.any(Function)); - }); - - it('should return existing connection if already connected', async () => { - const client = new AmqplibClient(validConfig); - - const firstConnection = await client.connect(); - const secondConnection = await client.connect(); - - expect(firstConnection).toBe(secondConnection); - expect(amqp.connect).toHaveBeenCalledTimes(1); - }); - - it('should throw RunMQException on connection failure', async () => { - (amqp.connect as jest.Mock).mockRejectedValue(new Error('Connection failed')); - - const client = new AmqplibClient(validConfig); - - await expect(client.connect()).rejects.toThrow(RunMQException); - - await expect(client.connect()).rejects.toMatchObject({ - exception: Exceptions.CONNECTION_NOT_ESTABLISHED, - details: { - error: 'Connection failed' - } - }); - - expect(client.isActive()).toBe(false); - }); - - it('should handle error event and set isConnected to false', async () => { - const client = new AmqplibClient(validConfig); - await client.connect(); - - const errorHandler = (mockConnection.on as jest.Mock).mock.calls.find( - call => call[0] === 'error' - )[1]; - - errorHandler(new Error('Connection error')); - - expect(client.isActive()).toBe(false); - }); - - it('should handle close event and set isConnected to false', async () => { - const client = new AmqplibClient(validConfig); - await client.connect(); - - const closeHandler = (mockConnection.on as jest.Mock).mock.calls.find( - call => call[0] === 'close' - )[1]; - - closeHandler(); - - expect(client.isActive()).toBe(false); - }); - }); - - describe('getChannel', () => { - it('should create and return a channel', async () => { - const client = new AmqplibClient(validConfig); - const channel = await client.getChannel(); - - expect(channel).toBe(mockChannel); - expect(mockConnection.createChannel).toHaveBeenCalled(); - expect(amqp.connect).toHaveBeenCalledWith(validConfig.url); - }); - - it('should connect first if not connected', async () => { - const client = new AmqplibClient(validConfig); - const channel = await client.getChannel(); - - expect(amqp.connect).toHaveBeenCalled(); - expect(mockConnection.createChannel).toHaveBeenCalled(); - expect(channel).toBe(mockChannel); - }); - }); - - describe('disconnect', () => { - it('should disconnect successfully when connected', async () => { - const client = new AmqplibClient(validConfig); - await client.connect(); - await client.disconnect(); - - expect(mockConnection.close).toHaveBeenCalled(); - expect(client.isActive()).toBe(false); - }); - - it('should do nothing if not connected', async () => { - const client = new AmqplibClient(validConfig); - await client.disconnect(); - - expect(mockConnection.close).not.toHaveBeenCalled(); - }); - - it('should throw RunMQException on disconnect failure', async () => { - (mockConnection.close as jest.Mock).mockRejectedValue(new Error('Disconnect failed')); - - const client = new AmqplibClient(validConfig); - await client.connect(); - - await expect(client.disconnect()).rejects.toThrow(RunMQException); - - await expect(client.disconnect()).rejects.toMatchObject({ - exception: Exceptions.CONNECTION_NOT_ESTABLISHED, - details: { - error: 'Disconnect failed' - } - }); - }); - }); - - describe('isActive', () => { - it('should return true when connected with channelModel', async () => { - const client = new AmqplibClient(validConfig); - await client.connect(); - - expect(client.isActive()).toBe(true); - }); - - it('should return false when not connected', () => { - const client = new AmqplibClient(validConfig); - - expect(client.isActive()).toBe(false); - }); - - it('should return false after disconnect', async () => { - const client = new AmqplibClient(validConfig); - await client.connect(); - await client.disconnect(); - - expect(client.isActive()).toBe(false); - }); - }); -}); \ No newline at end of file diff --git a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts index 5d8f6d5..68c18a0 100644 --- a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts +++ b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts @@ -1,6 +1,6 @@ import {ConsumerConfiguration} from '@src/core/consumer/ConsumerConfiguration'; import {Constants, DEFAULTS} from '@src/core/constants'; -import {MockedRabbitMQChannel} from "@tests/mocks/MockedRabbitMQChannel"; +import {MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel"; import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; import {ConsumerConfigurationExample} from "@tests/Examples/ConsumerConfigurationExample"; import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator"; @@ -12,7 +12,7 @@ import {RunMQException} from "@src/core/exceptions/RunMQException"; jest.mock('@src/core/management/Policies/RunMQTTLPolicyManager'); describe('RunMQConsumerCreator Unit Tests', () => { - const mockedChannel = new MockedRabbitMQChannel(); + const mockedChannel = new MockedAMQPChannel(); const mockedClient = new MockedAMQPClient(mockedChannel); const mockTTLPolicyManager = { initialize: jest.fn(), @@ -43,7 +43,7 @@ describe('RunMQConsumerCreator Unit Tests', () => { jest.mocked(RunMQTTLPolicyManager).mockImplementation(() => mockTTLPolicyManager as any); mockTTLPolicyManager.initialize.mockResolvedValue(undefined); mockTTLPolicyManager.apply.mockResolvedValue(true); - consumerCreator = new RunMQConsumerCreator(mockedChannel, mockedClient, MockedRunMQLogger, undefined); + consumerCreator = new RunMQConsumerCreator(mockedClient, MockedRunMQLogger, undefined); }); describe('createConsumer', () => { diff --git a/tests/unit/core/consumer/processors/RunMQFailedMessageRejecterProcessor.test.ts b/tests/unit/core/consumer/processors/RunMQFailedMessageRejecterProcessor.test.ts index 19b1f85..887f560 100644 --- a/tests/unit/core/consumer/processors/RunMQFailedMessageRejecterProcessor.test.ts +++ b/tests/unit/core/consumer/processors/RunMQFailedMessageRejecterProcessor.test.ts @@ -1,15 +1,23 @@ import {RunMQFailedMessageRejecterProcessor} from "@src/core/consumer/processors/RunMQFailedMessageRejecterProcessor"; -import {MockedRabbitMQMessage} from "@tests/mocks/MockedRabbitMQMessage"; import {MockedThrowableRabbitMQConsumer} from "@tests/mocks/MockedRunMQConsumer"; +import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; describe('RunMQFailedMessageRejecterProcessor', () => { const consumer = new MockedThrowableRabbitMQConsumer() - const rabbitMQMessage = MockedRabbitMQMessage - it("should nack message with allUpTO false and requeue false when consumer throws", async () => { + const mockMessage = { + ack: jest.fn(), + nack: jest.fn() + } as unknown as jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("should nack message with requeue false when consumer throws", async () => { const processor = new RunMQFailedMessageRejecterProcessor(consumer) - const result = await processor.consume(rabbitMQMessage) + const result = await processor.consume(mockMessage) expect(result).toBe(false) - expect(rabbitMQMessage.channel.nack).toHaveBeenCalledWith(rabbitMQMessage.amqpMessage, false, false) + expect(mockMessage.nack).toHaveBeenCalledWith(false) }); }) \ No newline at end of file diff --git a/tests/unit/core/consumer/processors/RunMQRetriesCheckerProcessor.test.ts b/tests/unit/core/consumer/processors/RunMQRetriesCheckerProcessor.test.ts index f0e6c72..5759445 100644 --- a/tests/unit/core/consumer/processors/RunMQRetriesCheckerProcessor.test.ts +++ b/tests/unit/core/consumer/processors/RunMQRetriesCheckerProcessor.test.ts @@ -8,7 +8,7 @@ import { mockedRabbitMQMessageWithDeathCount } from "@tests/mocks/MockedRabbitMQMessage"; import {MockedRabbitMQPublisher} from "@tests/mocks/MockedRunMQPublisher"; -import {MockedRabbitMQChannelWithAcknowledgeFailure} from "@tests/mocks/MockedRabbitMQChannel"; +import {MockedAMQPChannelWithAcknowledgeFailure} from "@tests/mocks/MockedAMQPChannel"; describe('RunMQRetriesCheckerProcessor', () => { const consumer = new MockedThrowableRabbitMQConsumer() @@ -38,7 +38,7 @@ describe('RunMQRetriesCheckerProcessor', () => { ConsumerCreatorUtils.getDLQTopicName(processorConfig.name), message ) - expect(message.channel.ack).toHaveBeenCalledWith(message.amqpMessage, false); + expect(message.channel.ack).toHaveBeenCalledWith(message.amqpMessage); }) }) @@ -48,7 +48,7 @@ describe('RunMQRetriesCheckerProcessor - acknowledgeMessage', () => { it("should throw error if acknowledge message failed", async () => { const message = mockedRabbitMQMessageWithChannelAndDeathCount( - new MockedRabbitMQChannelWithAcknowledgeFailure(), + new MockedAMQPChannelWithAcknowledgeFailure(), 2 ) const runMQPublisher = new MockedRabbitMQPublisher() diff --git a/tests/unit/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.test.ts b/tests/unit/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.test.ts index ff7f3f5..e252a64 100644 --- a/tests/unit/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.test.ts +++ b/tests/unit/core/consumer/processors/RunMQSucceededMessageAcknowledgerProcessor.test.ts @@ -10,9 +10,8 @@ import { describe('RunMQSucceededMessageAcknowledgerProcessor', () => { const message = { - channel: { - ack: jest.fn() - } + ack: jest.fn(), + nack: jest.fn() } as unknown as jest.Mocked; beforeEach(() => { @@ -24,7 +23,7 @@ describe('RunMQSucceededMessageAcknowledgerProcessor', () => { const processor = new RunMQSucceededMessageAcknowledgerProcessor(successfulConsumer) const result = await processor.consume(message) expect(result).toBe(true) - expect(message.channel.ack).toHaveBeenCalledWith(message.message) + expect(message.ack).toHaveBeenCalled() }); it("should return false and not ack message when result is false", async () => { @@ -32,7 +31,7 @@ describe('RunMQSucceededMessageAcknowledgerProcessor', () => { const processor = new RunMQSucceededMessageAcknowledgerProcessor(failedConsumer) const result = await processor.consume(message) expect(result).toBe(false) - expect(message.channel.ack).not.toHaveBeenCalled() + expect(message.ack).not.toHaveBeenCalled() }) it("should rethrow when consumer throws", async () => { diff --git a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts index 1675481..4c61fde 100644 --- a/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts +++ b/tests/unit/core/publisher/producers/RunMQBaseProducer.test.ts @@ -1,8 +1,6 @@ import {RunMQBaseProducer} from '@src/core/publisher/producers/RunMQBaseProducer'; import {RunMQMessage, RunMQMessageMeta} from '@src/core/message/RunMQMessage'; import {Constants} from '@src/core/constants'; -import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; -import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties"; import {MockedRabbitMQChannel} from "@tests/mocks/MockedRabbitMQChannel"; import {MockedRabbitMQMessage, mockedRabbitMQMessageWithChannelAndMessage} from "@tests/mocks/MockedRabbitMQMessage"; import {MockedDefaultSerializer} from "@tests/mocks/MockedDefaultSerializer";