Implement an event bus on Kubernetes with RabbitMQ using MassTransit in Microservice Architecture Part - 2

Event-driven microservice architecture, services communicate each-other using event messages. When business events occur, producers publish them with messages. At the same time, other services consume them through event listeners. The main benefits of event-driven systems are asynchronous behavior and loosely coupled structures.

In this article, we are going to talk about a simple publisher/subscriber scenario deployed on Kubernetes cluster. To impalement publisher and subscriber we use Masstransit.

In this tutorial, we will use Red Hat® OpenShift® version 4.6  Kubernetes container platform to deploy RabbitMQ cluster , publisher and consumer application. 

Prerequisites

You should have a basic understanding of RabbitMQ, Docker, Kubernetes, Openshift, ASP.NET Core WebAPI, and .NET Core. Read following articles before start.

Openshift Architecture
https://darshanadinushal.blogspot.com/2021/02/openshift-with-kubernetes-openshift-v4.html

RabbitMQ Cluster Deployment on Openshift
https://darshanadinushal.blogspot.com/2021/03/rabbitmq-cluster-on-kubernates.html

Source Code

You can download the source code from my GitHub profile.

Use Case

In this article, we are going to talk about a simple publisher/subscriber scenario. Assume that we have a health application that lets people to create doctor appointment base on that publisher will publish message to the queue ,then consumer application will consume the message and trigger the email to the end-user.

If the consumer consume the message if there is a failure or exception occur system should be able to re-try and then move the message to the Error queue.

Microservices based on event-driven async communication.

MassTransit

MassTransit is free/ open-source lightweight software .NET-based Enterprise Service Bus (ESB).MassTransit helps Microsoft developers route messages over MSMQ, RabbitMQ and ActiveMQ service busses, with native support for MSMQ and RabbitMQ.

What does MassTransit add on top of RabbitMQ?

Threaded Consumers  
Multiple concurrent consumers possible it will handle by the MassTransit, and also asynchronous message (The message pipeline in MassTransit is asynchronous) 
 
Exception Management 
MassTransit implement some level of generic exception handling for consumers, then those messages moved to error queue and later we can inspect the message and re-queue it. 
 
Retries & Poison Messages 
If consumer throw an exception, MassTransit uses a retry policy to redeliver the message to the consumer. If the retries are comes to the max-retry count due to continued failures or other reasons, MassTransit moves the message to an error queue.

Transactions
MassTransit manage the transaction and also it will manage the database transition also it also support Entity Framework integrations.

This means receiving an individual message may involve several threads over the life cycle of the consumer. (The message pipeline in MassTransit is asynchronous and also leveraging the Task extensively to maximum thread utilization.)

using(var scope = new TransactionScope()){
            SaveEntity(entity);
            scope.Complete();
        }

A single transaction is shared across multiple operations that are committed as a single unit. If the commit fails, everything is undone and the message is faulted (or retried, if the retry middleware is used).

Serialization
MassTransit provides a number of serializes, including BSON, JSON, XML and Binary.

Routing
MassTransit provides a heavily production tested convention for using RabbitMQ exchanges to route published messages to the subscribed consumers.

Unit Testability
MassTransit provide TestFramework NuGet package.

Tracing and Monitoring
Using the tracing functionality, you can get very detailed timings of when and where things were consumed, how long the receive took, how long the consume took and what exceptions were thrown.

Sagas
Sagas are initiated by an event, sagas orchestrate events, and maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs.

Publisher Application Implementation

1. Create Asp.net core 3.1 web Api application using visual studio.

In here I am not going to describe in details how to create .net core application. You can download the source code in my GitHub repository as I mention above. 

After create application install the following Nuget Packages.


2. Add RabbitMQ connection string to appsettings.json.
    We can provide connection string in two different ways as follows.


3. Register the RabbitMQ and MassTransit dependencies.

Here I register all dependencies as extensions inside the ConfigureServices method in the Startup.cs. In here we can provide ExchangeType (Direct, Topic, Fanout, and Header exchanges).It is basically a routing rule for the messages. Messages are not published directly to a queue; instead, the producer sends messages to an exchange.


4. Create ProducerService class to publish message.

A publish endpoint lets the underlying transport determine the actual endpoint to which the message is sent. For example, an exchange on RabbitMQ. For more info.

    

5. Create Controller class Post method to expose the publisher service to the outside.

6. Add a docker file using visual studio Docker Support(see the image) and build and publish the image to DockerHub or any Docker Image repository 

Add Docker file 













Build Docker Image 

docker build -t <dockerhubId>/rabbitmq-publisher:1.0.0 -f microservices/Sample.Application.Publisher/Dockerfile .

Docker push

docker push <dockerhubId>/rabbitmq-publisher:1.0.0

6. Deploy Publisher application to the Openshift 

If you are using  Kubernetes cluster ,you can deploy this image as well as using kubectl command. 


Consumer Application Implementation

1. Create Asp.net core 3.1 web Api application using visual studio. Delete UseRouting , UseAuthorization and UseEndpoints Configure method in Startup.cs class.

Startup.cs

 After create application add following Nuget Packages.
    

 






2. Add RabbitMQ connection string and Email credentials to appsettings.json


Parameter name

Remarks

HostName               

RabbitMQ host URL

UserName

RabbitMQ UserName

Password

RabbitMQ Password

Port

RabbitMQ service expose port

RetryCount

The maximum re-try count, If the consumer fail or Exception happen consumer will retry again and again until come to the maximum re-try limit.

Interval

Retry after a fixed delay, up to the retry limit

 

 

SmtpEmailProvider

This is sample email service for sending email. You can google it to find an email services, there a lot of companies provide Email services. In here I use mailject because they have free subscription that we can use. mailject  

SmtpClientHost

in-v3.mailjet.com

From

This is from email that all the email send.

UserName/Password

Username and password

Port

Email port.


3. Register the RabbitMQ and MassTransit dependencies.

Startup.cs

Tag Name

Remarks

1

Adds the MassTransit Service to the ASP.NET Core Service Container.

2

Add a new Consumer class “MessageRequestConsumerService” with ClaimSubmission .In the ClaimSubmission we can configure retry process.

3

Add Message Filter class, before consumer consume the message, we can validate message and throw an Exception.

4

Creates a new Service using RabbitMQ. Here we pass parameters like the host url, username and password.

5

Here we define the Receive endpoint. In the RabbitMQ it will create new Queue “Appoinment-Create” and ConfigureConsumer will link the “Appoinment-Create” Queue to the consumer class “MessageRequestConsumerService”.   

MassTransit Middleware

MassTransit is built on top of Green Pipes, which is used to create a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters for the detailed view of MassTransit's Receive Pipeline read this document


4. RabbitMQ Consumer Message Filter.

Sample.Application.Consumer.Infra.Filter.MessageValidateFilter.cs  

The scoped filters is transferring data between the consumer. This data may be extracted from headers, or could include context or authorization information that needs to be passed from a consumed message context to sent or published messages.

In here we will validate the message object before consume. The “RegexUtilities” class IsValidateEmail method validate the email address, if email is in-valid it will throw an exception.

MassTransit Retry

Some exceptions may be caused by a transient condition, such as a database deadlock, a busy web service, or some similar type of situation which usually clears up on a second attempt. With these exception types, it is often desirable to retry the message delivery to the consumer, allowing the consumer to try the operation again.

When configuring message retry, there are several retry policies available, including:

Policy

Description

None

No retry

Immediate

Retry immediately, up to the retry limit

Interval

Retry after a fixed delay, up to the retry limit

Intervals

Retry after a delay, for each interval specified

Exponential

Retry after an exponentially increasing delay, up to the retry limit

Incremental

Retry after a steadily increasing delay, up to the retry limit

Exception Filters

Sometimes you do not want to always retry, but instead only retry when some specific exception is thrown and fault for all other exceptions. To implement this, you can use an exception filter. Specify exception types using either the Handle or Ignore method. A filter can have either Handle or Ignore statements, combining them has unpredictable effects.

5. RabbitMQ Consumer ClaimSubmition.

Sample.Application.Consumer.Infra.Service.ConsumerDefinition.MessageRequestClaimSubmission.cs

















In here we configure the RetryCount (3) and Interval (5000) in the appsettings.json file , that value we pass to the Interval method. It will retry three times every 5 seconds. After retry fails that message will move to the Error Queue.

 6. RabbitMQ Consumer class.

Sample.Application.Consumer.Infra.Service.ConsumerService.MessageRequestConsumerService.cs







This is the consumer class ,when everything successfully execute in the MassTransit middleware ,If there is a no exception happen before finally it will execute the consume method. In here it will trigger the SendMail method and send the email to the end user.

** Download the source code and customize according to your scope. Github repository mention above.

 7. Add a docker file using visual studio Docker Support and build and publish the image to DockerHub or any Docker Image repository

Build Docker Image 

docker build -t <dockerhubId>/rabbitmq-consumer:1.0.0 -f microservices/Sample.Application.Consumer/Dockerfile .

Docker push

docker push <dockerhubId>/rabbitmq-consumer:1.0.0


 

   









Comments

Popular posts from this blog

Provision Red Hat OpenShift Cluster On AWS

Redis Enterprise on Openshift to Manage Distributed Chase in Microservices