Azure Service Bus Queue with MassTransit


Reading time: 13 minutes

Description

In this post I will explain how to create a bus queue and consume it. The case study would be around the order system. Let’s imagine a system where you can create a new order for whatever reason in the industry you like. This could be an e-commerce website, an airplane or bus ticket ordering website.

What is required

  • Azure Subscription
  • Visual Studio
  • Net Core Web App

Let’s start by creating a Net Core App

Open Visual Studio and choose ASP.NET Core Web Application. Choose an API project template.

This project is going to be our Sender according to Microsoft resources or Producers according to Masstransit documentation.

In the same solution add New Project and select Class Library (.Net Standard) This project will contain our Contracts. You can read a definition for it here or here.

The reason why we add the class library of Net Standard is because it provides a uniformity in the .Net ecosystem. In other words if you got two projects of different .Net platform versions(.Net Framework 4.5 and .Net Core 3.1) thanks to .Net Standard these would be able to share a particular class library without any issues.

Now we need to add a Consumer or Receiver. In essence this is where our message would end up going. Or you can process it and send it further where you want to.

Add a new web app project to this solution and select Empty template.

Next thing is to add several Nuget packages

We need to install several MassTransit related packages in addition to Azure Service bus.

Required packages:

Right click on the project and select Manage Nuget Packages. In that tab search for each of these packages and install them to Sender and Consumer projects.

Installed Nugets should look like this.

Next thing we would be adding the code to send and consume message

In the Controller folder of the Sender project add a new controller. I will call it OrdersController. Next add the HttpPost method NewOrder.

Action method should look like this.

    [HttpPost()]
        public async Task<IActionResult> NewOrder()
        {
            var sendEndpoint =
             await _sendEndpointProvider.GetSendEndpoint(
                 new Uri("sb://servicebusqueuesnetcore.servicebus.windows.net/new-orders"));

            await sendEndpoint.Send(
                                        new Order
                                        {
                                            OrderId = Guid.NewGuid(),
                                            Timestamp = DateTime.UtcNow,
                                            PublicOrderId = _random.Next(1, 999).ToString()
                                        });

            return Ok();
        }

We also need to add a contract. In the Contracts project add a new class and call it Order. Add all properties that you require. It should look like the example below.

public class Order
    {
        public Guid OrderId { get; set; }
        public string PublicOrderId { get; set; }

        public DateTime Timestamp { get; set; }
    }

Let’s go back to the Sender project and open the Startup file. In it we will add a service bus, MassTransit, register and define endpoints.

In there we need to edit the ConfigureServices method in order to add required services to the DI container.

This is how it should look like.

 public void ConfigureServices(IServiceCollection services)
        {
            var connectionString =
        "Endpoint=sb://servicebusqueuesnetcore.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=blablasharedaccesskey";

            var newOrdersQueue = "new-orders";

            // create the bus using Azure Service bus
            var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
            {
                busFactoryConfig.Host(connectionString);

                // specify the message of Order object to be sent to a specific queue
                busFactoryConfig.Message<Order>(configTopology =>
                {
                    configTopology.SetEntityName(newOrdersQueue);
                });
            });

            services.AddMassTransit
                (
                    config =>
                    {
                        config.AddBus(provider => azureServiceBus);
                    }
                );

            services.AddSingleton<ISendEndpointProvider>(azureServiceBus);
            services.AddSingleton<IBus>(azureServiceBus);

            services.AddControllers();
        }

Next we need to perform a similar sort of changes in the Consumer project.

Add a new class and call it OrderConsumer. Inside let’s add some code.

  public class OrderConsumer 
        : IConsumer<Order>
    {
        public Task Consume(ConsumeContext<Order> context)
        {
            System.Threading.Thread.Sleep(60000);//Wait for one minute

            //by returning a completed task service bus removes message from the queue
            return Task.CompletedTask;
        }
    }

Next let’s add HostedService that would start and stop our service bus. It’s very important to start service bus before attempting anything.

Add a new class and call it BusHostedService.

public class BusHostedService
        : IHostedService
    {
        readonly IBusControl _busControl;

        public BusHostedService(
            IBusControl busControl)
        {
            _busControl = busControl;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _busControl.StartAsync(cancellationToken);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await _busControl.StopAsync(cancellationToken);
        }
    }

Next it’s Startup files turn. Similar changes as for Sender Startup. We need to configure services that would be available through Dependency Injection.

Note: Queue name should match between Sender and Consumer.

 public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "endpoint of your service bus";
            var newOrdersQueue = "new-orders"; // need to make sure the queue name is written correctly

            services.AddMassTransit(serviceCollectionConfigurator =>
            {
                serviceCollectionConfigurator.AddConsumer<OrderConsumer>();

                //Consumers - Receivers
                //Message Creators - Senders
                //would normally be in different applications

                serviceCollectionConfigurator.AddBus
                    (registrationContext => Bus.Factory.CreateUsingAzureServiceBus
                                                    (configurator =>
                                                    {
                                                        configurator.Host(connectionString);

                                                        /*
                                                         For a consumer to receive messages, the consumer must be connected to a receive endpoint. 
                                                        This is done during bus configuration, particularly within the configuration of a receive endpoint.
                                                        https://masstransit-project.com/usage/consumers.html#consumer*/

                                                        configurator.ReceiveEndpoint(newOrdersQueue, endpointConfigurator =>
                                                        {
                                                            endpointConfigurator.ConfigureConsumer<OrderConsumer>(registrationContext);
                                                        });
                                                    }
                                                    )
                     );

            });

            //need to always start the bus, so it behaves correctly
            services.AddSingleton<IHostedService, BusHostedService>();
        }

Next thing we need to create Azure Service Bus

For this we need to go to Azure Portal home. Select Create a Resource.

On the next screen choose Integration then in the right pane menu Service Bus.

When you are creating Service Bus the things to consider are Resource group, Location and Pricing tier.

For Resource groups either create a new or select the existing one. If it’s part of existing then you can manage it with other resources in that group.

Normally Location should be local to you as different regions/locations have different regulations, latency etc. You can either use the same region or pair them. For example your web app uses UK South, so you would consider using UK South instead of East US for your service bus. If you are pairing then consider using UK South and UK West.

However there are no limits as to what region and location to use. As long as it answers your personal or business requirements. You can read more about regions here and here.

If you consider using different regions then check out this Microsoft article.

As for the tier let’s use the Standard pricing tier since it always works and it’s easier to set up.

Note: Azure charges money for this tier. So make sure you use it accordingly and delete the service bus once you stop playing with it.

Once it’s successfully deployed we need to go and retrieve the connection string for our application. Go to the resource and select the service bus namespace.

On your service bus find Settings and select Shared access policies. Then select RootManageSharedAccessKe and copy Primary Connection String.

We need to paste the whole string into the connectionString variable value in the Startup file of Sender and Consumer projects.

Next let’s test what we created

Since we have Sender and Consumer in the same solution we need to set this solution to run multiple projects. Right click on the solution, select radio button for Multiple startup projects. From the dropdown of Action columns select which projects to run.

To simplify things disable SSL for Sender and Consumer projects. On the Sender project right click and select Properties. In the newly opened tab select Debug and find Enable SSL checkbox. Untick it and save it.

Run the solution.

Because we need to hit HttpPost method we would require some sort of API client tool. For this example let’s try out Postman.

Let’s grab the URL of our Sender project and put it into the Postman tab with HttpPost method type selected. We also need to prefix it with controller route.

http://localhost:58852/api/orders

It should return Status 200 OK if everything went ok.

Next let’s check our Azure Service Bus on Azure portal

If you go back to azure portal and open our service bus namespace. Every time you hit the HttpPost method a message will be enqueued. The consumer will process it when it will become available. One of the message models of Azure Service Bus Queues is First in, First out(FIFO).

As you can see our Queue has been added. If you click on it you would be able to see some stats and message count.

Because we suspend current thread for a minute the Consumer will process each new message once this time has passed. So if you hit that HttpPost method several times all these messages will be added to our queue one by one. However the message will start to process after one minute from the moment it was added.

public Task Consume(ConsumeContext<Order> context)
        {
            System.Threading.Thread.Sleep(60000);//Wait for one minute

            //by returning a completed task service bus removes message from the queue
            return Task.CompletedTask;
        }

By returning a completed task this message will be removed from the queue.

Before we wrap up we need to tidy things up

This is especially the case because we are using the Standard pricing tier for this Service Bus namespace.

There are several options as to how to do it.

You can delete the Service Bus namespace on the page itself.

Another option is to delete a Resource group altogether.

Conclusion

Azure Service Bus is a cloud messaging service. What is appealing is the simplicity of use and set up. What makes it even more easier is the addition of MassTransit, a distributed application framework which is responsible for abstracting the transportation.

By using these tools correctly you are able to achieve service decoupling with no sweat. There are other reasons to use it. To name a few, the application scaling, asynchronous processing and monitoring.

Source code

2 thoughts on “Azure Service Bus Queue with MassTransit”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: