Library for applications that process messages asynchronously. Provides interfaces and implementations for message queues, topics, pollers and processors.
The library is split into modules:
liflig-messaging-coreprovides theQueue,TopicandMessageProcessorinterfaces, as well as theMessagePollerclass for polling messages from a queue.liflig-messaging-awssdkimplements theQueueinterface for AWS SQS and theTopicinterface for AWS SNS, using the AWS SDK.liflig-messaging-sqs-lambdaprovides a function for processing messages in AWS Lambda functions that use SQS as the event source. It improves failure handling for individual messages in a batch, and allows you to use the sameMessageProcessorinterface as in long-running services.
Contents:
The main thing that your application has to concern itself with, is implementing the
MessageProcessor interface. This is where your application-specific message processing logic goes.
Example:
import no.liflig.messaging.Message
import no.liflig.messaging.MessageProcessor
import no.liflig.messaging.ProcessingResult
class ExampleEventProcessor : MessageProcessor {
override fun process(message: Message): ProcessingResult {
val event = ExampleEvent.fromJson(message.body)
handleEvent(event)
return ProcessingResult.Success
}
}How you use your MessageProcessor depends on if your application is a long-running service, or an
AWS Lambda function.
For a long-running service, you'll use MessagePoller. You pass your MessageProcessor and a
Queue implementation to its constructor, and call messagePoller.start() on application start-up.
This spawns a thread that runs side-by-side with your service, polling messages from the queue and
passing them to your processor.
If you use liflig-messaging-awssdk, you can use the SqsQueue implementation for AWS SQS (Simple
Queue Service).
import no.liflig.messaging.MessagePoller
import no.liflig.messaging.Queue
import no.liflig.messaging.awssdk.SqsQueue
import software.amazon.awssdk.services.sqs.SqsClient
class App(config: Config) {
val inputQueue: Queue = SqsQueue(SqsClient.create(), config.inputQueueUrl)
val messagePoller = MessagePoller(
queue = inputQueue,
messageProcessor = ExampleEventProcessor(),
)
fun start() {
messagePoller.start()
}
}
fun main() {
App(Config.load()).start()
}For an AWS Lambda function, you'll construct your MessageProcessor implementation in your
LambdaHandler, and then call handleLambdaSqsEvent (from liflig-messaging-sqs-lambda) in your
handler method:
import no.liflig.messaging.lambda.handleLambdaSqsEvent
class LambdaHandler(
private val eventProcessor: MessageProcessor = ExampleEventProcessor(),
) {
fun handle(sqsEvent: SQSEvent): SQSBatchResponse {
return handleLambdaSqsEvent(sqsEvent, messageProcessor)
}
}Important
In order for handleLambdaSqsEvent to work correctly, your handler method must return
SQSBatchResponse. And in order for that to work, you have to configure batch item failures on
your Lambda <-> SQS integration. In AWS CDK, you do this on the SqsEventSource:
myLambda.addEventSource(
new SqsEventSource(myQueue, { reportBatchItemFailures: true })
);For more on the reason behind this, see the docstring on handleLambdaSqsEvent.
Construct a Queue like you would for the MessagePoller example above. You can then call send
on it to send a message.
import no.liflig.messaging.Queue
import no.liflig.messaging.awssdk.SqsQueue
import software.amazon.awssdk.services.sqs.SqsClient
class ExampleEventSender(
private val outputQueue: Queue = SqsQueue(SqsClient.create(), queueUrl = "..."),
) {
fun sendEvent(event: ExampleEvent) {
outputQueue.send(event.toJson())
}
}The Topic interface from liflig-messaging-core represents a pub-sub message topic.
liflig-messaging-awssdk provides SnsTopic, an implementation of this interface for AWS SNS
(Simple Notification Service).
import no.liflig.messaging.Topic
import no.liflig.messaging.awssdk.SnsTopic
import software.amazon.awssdk.services.sns.SnsClient
class ExampleEventPublisher(
private val eventTopic: Topic = SnsTopic(SnsClient.create(), topicArn = "..."),
) {
fun publishEvent(event: ExampleEvent) {
eventTopic.publish(event.toJson())
}
}We use Maven as the example build system here.
First, add the core module:
<dependency>
<groupId>no.liflig</groupId>
<artifactId>liflig-messaging-core</artifactId>
<version>${liflig-messaging.version}</version>
</dependency>Then, add extra modules depending on your use-case:
- If your application is a long-running service, and you want to use the AWS SDK implementations:
<dependency> <groupId>no.liflig</groupId> <artifactId>liflig-messaging-awssdk</artifactId> <version>${liflig-messaging.version}</version> </dependency>
- If your application is an AWS Lambda function with an SQS event source:
<dependency> <groupId>no.liflig</groupId> <artifactId>liflig-messaging-sqs-lambda</artifactId> <version>${liflig-messaging.version}</version> </dependency>
mvn clean installmvn spotless:checkmvn spotless:apply