Implement an event bus on Kubernetes with RabbitMQ using MassTransit in Microservice Architecture Part - 2
Event-driven microservice architecture, services communicate each-other using event messages. When business events occur, producers publish them with messages. At the same time, other services consume them through event listeners. The main benefits of event-driven systems are asynchronous behavior and loosely coupled structures.
In this article, we are going to talk about a simple publisher/subscriber scenario deployed on Kubernetes cluster. To impalement publisher and subscriber we use Masstransit.
In this tutorial, we will use Red Hat® OpenShift® version 4.6 Kubernetes container platform to deploy RabbitMQ cluster , publisher and consumer application.
Prerequisites
You should have a basic understanding of RabbitMQ, Docker, Kubernetes, Openshift, ASP.NET Core WebAPI, and .NET Core. Read following articles before start.
Openshift Architecturehttps://darshanadinushal.blogspot.com/2021/02/openshift-with-kubernetes-openshift-v4.html
Apache Kafka and RabbitMQ
https://darshanadinushal.blogspot.com/2021/03/kafka-vs-rabbitmq-architecture.html
RabbitMQ Cluster Deployment on Openshift
Source Code
You can download the source code from my GitHub profile.
Use Case
In this article, we are going to talk about a simple publisher/subscriber scenario. Assume that we have a health application that lets people to create doctor appointment base on that publisher will publish message to the queue ,then consumer application will consume the message and trigger the email to the end-user.
If the consumer consume the message if there is a failure or exception occur system should be able to re-try and then move the message to the Error queue.
Microservices based on event-driven async communication. |
MassTransit
MassTransit is free/ open-source lightweight software .NET-based Enterprise Service Bus (ESB).MassTransit helps Microsoft developers route messages over MSMQ, RabbitMQ and ActiveMQ service busses, with native support for MSMQ and RabbitMQ.
What does MassTransit add on top of RabbitMQ?
Threaded ConsumersMultiple concurrent consumers possible it will handle by the MassTransit, and also asynchronous message (The message pipeline in MassTransit is asynchronous)
Exception Management
MassTransit implement some level of generic exception handling for consumers, then those messages moved to error queue and later we can inspect the message and re-queue it.
Retries & Poison Messages
This means receiving an individual message may involve several threads over the life cycle of the consumer. (The message pipeline in MassTransit is asynchronous and also leveraging the Task extensively to maximum thread utilization.)
using(var scope = new TransactionScope()){
SaveEntity(entity);
scope.Complete();
}
A single transaction is shared across multiple operations that are committed as a single unit. If the commit fails, everything is undone and the message is faulted (or retried, if the retry middleware is used).
Publisher Application Implementation
1. Create Asp.net core 3.1 web Api application using visual studio.
In here I am not going to describe in details how to create .net core application. You can download the source code in my GitHub repository as I mention above.
After create application install the following Nuget Packages.
A publish endpoint lets the underlying transport determine the actual endpoint to which the message is sent. For example, an exchange on RabbitMQ. For more info.
6. Add a docker file using visual studio Docker Support(see the image) and build and publish the image to DockerHub or any Docker Image repository.
docker build -t <dockerhubId>/rabbitmq-publisher:1.0.0 -f microservices/Sample.Application.Publisher/Dockerfile .
Docker push
docker push <dockerhubId>/rabbitmq-publisher:1.0.0
6. Deploy Publisher application to the Openshift
If you are using Kubernetes cluster ,you can deploy this image as well as using kubectl command.
Consumer Application Implementation
Parameter name |
Remarks |
HostName
|
RabbitMQ host URL |
UserName |
RabbitMQ UserName |
Password |
RabbitMQ Password |
Port |
RabbitMQ service expose port |
RetryCount |
The maximum re-try count, If the consumer fail or
Exception happen consumer will retry again and again until come to the maximum
re-try limit. |
Interval |
Retry after a fixed delay, up to the retry limit |
|
|
SmtpEmailProvider |
This is sample email service for sending email.
You can google it to find an email services, there a lot of companies provide
Email services. In here I use mailject because they have free
subscription that we can use. mailject |
SmtpClientHost |
in-v3.mailjet.com |
From |
This is from email that all the email send. |
UserName/Password |
Username and password |
Port |
Email port. |
Tag Name |
Remarks |
1 |
Adds the MassTransit Service to the ASP.NET Core
Service Container. |
2 |
Add a new Consumer class “MessageRequestConsumerService”
with ClaimSubmission .In the ClaimSubmission we can configure retry process. |
3 |
Add Message Filter class, before consumer consume
the message, we can validate message and throw an Exception. |
4 |
Creates a new Service using RabbitMQ. Here we
pass parameters like the host url, username and password. |
5 |
Here we define the Receive endpoint. In the
RabbitMQ it will create new Queue “Appoinment-Create” and ConfigureConsumer will
link the “Appoinment-Create” Queue to the consumer class “MessageRequestConsumerService”. |
MassTransit Middleware
MassTransit is built on top of Green Pipes, which is used to create a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters for the detailed view of MassTransit's Receive Pipeline read this document.
4. RabbitMQ Consumer Message Filter.
Sample.Application.Consumer.Infra.Filter.MessageValidateFilter.cs |
The scoped filters is transferring data between the consumer. This data may be extracted from headers, or could include context or authorization information that needs to be passed from a consumed message context to sent or published messages.
In here we will validate the message object before consume. The “RegexUtilities” class IsValidateEmail method validate the email address, if email is in-valid it will throw an exception.
MassTransit Retry
Some exceptions may be caused by a transient condition, such as a database deadlock, a busy web service, or some similar type of situation which usually clears up on a second attempt. With these exception types, it is often desirable to retry the message delivery to the consumer, allowing the consumer to try the operation again.
When configuring message retry, there are several retry policies available, including:
Policy |
Description |
None |
No retry |
Immediate |
Retry immediately, up to the retry limit |
Interval |
Retry after a fixed delay, up to the retry limit |
Intervals |
Retry after a delay, for each interval specified |
Exponential |
Retry after an exponentially increasing delay, up
to the retry limit |
Incremental |
Retry after a steadily increasing delay, up to
the retry limit |
Exception Filters
Sometimes you do not want to always retry, but instead only retry when some specific exception is thrown and fault for all other exceptions. To implement this, you can use an exception filter. Specify exception types using either the Handle or Ignore method. A filter can have either Handle or Ignore statements, combining them has unpredictable effects.
5. RabbitMQ Consumer ClaimSubmition.
Sample.Application.Consumer.Infra.Service.ConsumerDefinition.MessageRequestClaimSubmission.cs |
In here we configure the RetryCount (3) and Interval (5000) in the appsettings.json file , that value we pass to the Interval method. It will retry three times every 5 seconds. After retry fails that message will move to the Error Queue.
6. RabbitMQ Consumer class.
Sample.Application.Consumer.Infra.Service.ConsumerService.MessageRequestConsumerService.cs |
This is the consumer class ,when everything successfully execute in the MassTransit middleware ,If there is a no exception happen before finally it will execute the consume method. In here it will trigger the SendMail method and send the email to the end user.
** Download the source code and customize according to your scope. Github repository mention above.
7. Add a docker file using visual studio Docker Support and build and publish the image to DockerHub or any Docker Image repository.
Build Docker Image
docker build -t <dockerhubId>/rabbitmq-consumer:1.0.0 -f microservices/Sample.Application.Consumer/Dockerfile .
Docker push
docker push <dockerhubId>/rabbitmq-consumer:1.0.0
Comments
Post a Comment