Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"@semantic-release/git": "^10.0.1",
"@semantic-release/github": "^11.0.1",
"@semantic-release/release-notes-generator": "^11.0.7",
"@types/amqplib": "^0.10.7",
"@types/jest": "^30.0.0",
"@types/node": "^22.0.0",
"eslint": "^9.29.0",
Expand All @@ -69,7 +68,7 @@
},
"dependencies": {
"ajv": "^8.17.1",
"amqplib": "^0.10.8"
"rabbitmq-client": "^5.0.0"
},
"release": {
"branches": [
Expand Down
25 changes: 12 additions & 13 deletions src/core/RunMQ.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunMQMessageContent} from "@src/types";
import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunMQMessageContent, AMQPChannel} from "@src/types";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {AmqplibClient} from "@src/core/clients/AmqplibClient";
import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter";
import {Exceptions} from "@src/core/exceptions/Exceptions";
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {Constants, DEFAULTS} from "@src/core/constants";
import {Channel} from "amqplib";
import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator";
import {ConsumerConfiguration} from "@src/core/consumer/ConsumerConfiguration";
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
Expand All @@ -14,12 +13,12 @@ import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage";
import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties";

export class RunMQ {
private readonly amqplibClient: AmqplibClient;
private readonly client: RabbitMQClientAdapter;
private readonly config: RunMQConnectionConfig;
private publisher: RunMQPublisher | undefined
private readonly logger: RunMQLogger
private retryAttempts: number = 0;
private defaultChannel: Channel | undefined;
private defaultChannel: AMQPChannel | undefined;

private constructor(config: RunMQConnectionConfig, logger: RunMQLogger) {
this.logger = logger;
Expand All @@ -28,7 +27,7 @@ export class RunMQ {
reconnectDelay: config.reconnectDelay ?? DEFAULTS.RECONNECT_DELAY,
maxReconnectAttempts: config.maxReconnectAttempts ?? DEFAULTS.MAX_RECONNECT_ATTEMPTS,
};
this.amqplibClient = new AmqplibClient(this.config);
this.client = new RabbitMQClientAdapter(this.config);
}

/**
Expand All @@ -51,7 +50,7 @@ export class RunMQ {
* @param processor The function that will process the incoming messages
*/
public async process<T = Record<string, never>>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent<T>) => Promise<void>) {
const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger, this.config.management);
const consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management);
await consumer.createConsumer<T>(new ConsumerConfiguration(topic, config, processor))
}

Expand All @@ -62,14 +61,14 @@ export class RunMQ {
* @param correlationId (Optional) A unique identifier for correlating messages; if not provided, a new UUID will be generated
*/
public publish(topic: string, message: Record<string, any>, correlationId: string = RunMQUtils.generateUUID()): void {
if (!this.publisher) {
if (!this.publisher || !this.defaultChannel) {
throw new RunMQException(Exceptions.NOT_INITIALIZED, {});
}
RunMQUtils.assertRecord(message);
this.publisher.publish(topic,
RabbitMQMessage.from(
message,
this.defaultChannel!,
this.defaultChannel,
new RabbitMQMessageProperties(RunMQUtils.generateUUID(), correlationId)
)
);
Expand All @@ -85,7 +84,7 @@ export class RunMQ {
*/
public async disconnect(): Promise<void> {
try {
await this.amqplibClient.disconnect();
await this.client.disconnect();
} catch (error) {
throw new RunMQException(
Exceptions.CONNECTION_NOT_ESTABLISHED,
Expand All @@ -100,7 +99,7 @@ export class RunMQ {
* Checks if the connection is currently active.
*/
public isActive(): boolean {
return this.amqplibClient.isActive();
return this.client.isActive();
}

private async connectWithRetry(): Promise<void> {
Expand All @@ -109,7 +108,7 @@ export class RunMQ {

while (this.retryAttempts < maxAttempts) {
try {
await this.amqplibClient.connect();
await this.client.connect();
this.logger.log('Successfully connected to RabbitMQ');
this.retryAttempts = 0;
return;
Expand All @@ -134,7 +133,7 @@ export class RunMQ {
}

private async initialize(): Promise<void> {
this.defaultChannel = await this.amqplibClient.getChannel();
this.defaultChannel = await this.client.getDefaultChannel();
await this.defaultChannel.assertExchange(Constants.ROUTER_EXCHANGE_NAME, 'direct', {durable: true});
await this.defaultChannel.assertExchange(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, 'direct', {durable: true});
this.publisher = new RunMQPublisherCreator(this.logger).createPublisher();
Expand Down
67 changes: 0 additions & 67 deletions src/core/clients/AmqplibClient.ts

This file was deleted.

137 changes: 137 additions & 0 deletions src/core/clients/RabbitMQClientAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import {Connection, Channel} from "rabbitmq-client";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";
import {AMQPChannel, AMQPClient, RunMQConnectionConfig} from "@src/types";
import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel";

/**
* AMQPClient implementation using rabbitmq-client library.
* Leverages built-in reconnection, retry, and robustness features.
*/
export class RabbitMQClientAdapter implements AMQPClient {
private connection: Connection | undefined;
private defaultChannel: AMQPChannel | undefined;
private isConnected: boolean = false;
private acquiredChannels: Channel[] = [];

constructor(private config: RunMQConnectionConfig) {}

public async connect(): Promise<Connection> {
try {
if (this.connection && this.isConnected) {
return this.connection;
}

// Close any existing connection that might be in a bad state
if (this.connection) {
try {
await this.connection.close();
} catch {
// Ignore close errors
}
this.connection = undefined;
}

this.connection = new Connection({
url: this.config.url,
// Disable automatic retries - we handle retries at RunMQ level
retryLow: 100,
retryHigh: 200,
connectionTimeout: 5000,
});

// Set up event handlers before waiting for connection
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
this.isConnected = false;
});

this.connection.on('connection', () => {
this.isConnected = true;
});

this.connection.on('connection.blocked', (reason) => {
console.warn('RabbitMQ connection blocked:', reason);
});

this.connection.on('connection.unblocked', () => {
console.info('RabbitMQ connection unblocked');
});

// Wait for connection with timeout
// The second parameter (true) disables auto-close on timeout
await this.connection.onConnect(5000, true);
this.isConnected = true;

return this.connection;
} catch (error) {
this.isConnected = false;
// Clean up the connection on failure
if (this.connection) {
try {
this.connection.close();
} catch {
// Ignore
}
this.connection = undefined;
}
throw new RunMQException(
Exceptions.CONNECTION_NOT_ESTABLISHED,
{
error: error instanceof Error ? error.message : JSON.stringify(error)
}
);
}
}

public async getChannel(): Promise<AMQPChannel> {
const connection = await this.connect();
const rawChannel = await connection.acquire();
// Track the channel so we can close it on disconnect
this.acquiredChannels.push(rawChannel);
return new RabbitMQClientChannel(rawChannel);
}

public async getDefaultChannel(): Promise<AMQPChannel> {
if (!this.defaultChannel) {
this.defaultChannel = await this.getChannel();
}
return this.defaultChannel;
}

public async disconnect(): Promise<void> {
// Reset state first
const conn = this.connection;
const channels = this.acquiredChannels;

this.connection = undefined;
this.defaultChannel = undefined;
this.isConnected = false;
this.acquiredChannels = [];

// Close all acquired channels first
for (const channel of channels) {
try {
if (channel.active) {
await channel.close();
}
} catch {
// Ignore errors - channel might already be closed
}
}

// Now close the connection
if (conn) {
try {
await conn.close();
} catch {
// Ignore errors - connection might already be closed
}
}
}

public isActive(): boolean {
return this.connection !== undefined && this.isConnected;
}
}

Loading
Loading