Azure Service Bus Topic with MassTransit


Reading time: 13 minutes

Description

In this post I will explain how to create a service bus topic, subscribe to it and consume it. This is quite similar to a post about Azure Service Bus Queue with MassTransit.

The case study would be around the POS(point of sale) system. Here is a Microsoft example that I have based this tutorial on.

The main difference is that the message in the Queue is consumed by one or more competing consumers. It is always one message to one consumer though – one to one relationships. Whereas with Topics the message is available to one or many subscriptions. Each subscriber can receive a copy of the same message. In a real world example this could be different users, different systems and so on.

As with the Point of sale system example. You can have an Inventory management system that tracks when stock needs to be replenished and Management a Dashboard to view details of their sales.

What is required

  • Azure Subscription
  • Visual Studio
  • Net Core Web App

Let’s start by creating a Net Core App

Open Visual Studio and choose ASP.NET Core Web Application. Choose an API project template.

This project is going to be our Sender according to Microsoft resources or Producers according to Masstransit documentation.

In the same solution add New Project and select Class Library (.Net Standard) This project will contain our Contracts. You can read a definition for it here or here.

The reason why we add the class library of Net Standard is because it provides a uniformity in the .Net ecosystem. In other words if you got two projects of different .Net platform versions(.Net Framework 4.5 and .Net Core 3.1) thanks to .Net Standard these would be able to share a particular class library without any issues.

Now we need to add multiple Consumers or Receivers. In essence this is where our message would end up going. Or you can process it and send it further where you want to.

Add multiple new web app projects to this solution and select Empty template. For me it’s easier to track Consumers with name to contain a number e.g. AsbMassNetCoreTopic.Consumer, AsbMassNetCoreTopic.Consumer1 etc

Next thing is to add several Nuget packages

We need to install several MassTransit related packages in addition to Azure Service bus.

Required packages:

Right click on the project and select Manage Nuget Packages. In that tab search for each of these packages and install them to Sender and Consumer projects.

Installed Nugets should look like this.

Next thing we would be adding the code to publish and consume message

In the Controller folder of the Sender project add a new controller. I will call it PurchasesController. Next add the HttpPost method NewPurchase.

Action method should look like this.

 [HttpPost("new")]
        public async Task<IActionResult> NewPurchase()
        {
            var purchaseItems = new List<PurchaseItem>
            {
                new PurchaseItem
                {
                    PurchaseItemId = Guid.NewGuid(),
                    Timestamp = DateTime.UtcNow,
                    Name = "Bus Mass Transformer High Spec",
                    Amount = 1,
                    Price = 100.00m
                }
            };

            await _publishEndpoint
                        .Publish(
                                    new Purchase
                                    {
                                        PurchaseId = Guid.NewGuid(),
                                        PublicPurchaseId = $"Id_{_random.Next(1, 999)}",
                                        Timestamp = DateTime.UtcNow,
                                        PurchaseItems = purchaseItems
                                    }
                                    );

            return Ok();
        }

We need to create a contact for Purchase. In the Contracts project add a new class and call it Purchase. Add all properties that you require. I have also included a PurchaseItem contract. It’s up to you how detailed you want to make it.

The result should look like so:

  public class Purchase
    {
        public Guid PurchaseId { get; set; }
        public string PublicPurchaseId { get; set; }

        public DateTime Timestamp { get; set; }

        public IEnumerable<PurchaseItem> PurchaseItems { get; set; }
    }

And for the PurchaseItem

  public class PurchaseItem
    {
        public Guid PurchaseItemId { get; set; }
        public DateTime Timestamp { get; set; }

        public string Name { get; set; }
        public int Amount { get; set; }
        public decimal Price { get; set; }
    }

Now it ‘s time to modify the Startup file of the Sender project. This is the place where we will add a service bus, MassTransit, register and define endpoints. An option to consider if you are doing it for production is a separate Startup file specific for Service bus and MassTransit setup. Anyway let’s keep things simple for now.

This is how it should look like:

public void ConfigureServices(IServiceCollection services)
        {
            var connectionString =
        "Endpoint=sb://servicebustestnetcore.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=blablasharedaccesskey";

            var newPurchaseTopic = "new-purchase-topic";

            // create the bus using Azure Service bus
            var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
            {
                busFactoryConfig.Host(connectionString);

                // specify the message Purchase to be sent to a specific topic
                busFactoryConfig.Message<Purchase>(configTopology =>
                {
                    configTopology.SetEntityName(newPurchaseTopic);
                });

            });

            services.AddMassTransit
                (
                    config =>
                    {
                        config.AddBus(provider => azureServiceBus);
                    }
                );

            services.AddSingleton<IPublishEndpoint>(azureServiceBus);
            services.AddSingleton<IBus>(azureServiceBus);

            services.AddControllers();
        }

To note, because we are using topics we need to use IPublishEndpoint rather than ISendPointProvider like you would do for Queues.

We’re done with Sender for now, so let’s move to one of the Consumer projects.

Add a new class and call it PurchaseConsumer. Inside add this code.

public class PurchaseConsumer
    : IConsumer<Purchase>
    {
        public Task Consume(ConsumeContext<Purchase> context)
        {
            System.Threading.Thread.Sleep(60000);//60000 one minnute

            return Task.CompletedTask;
        }
    }

Next important part is to add a HostedService that would start and stop our service bus.

Add a new class and call it BusHostedService.

public class BusHostedService
       : IHostedService
    {
        readonly IBusControl _busControl;

        public BusHostedService(
            IBusControl busControl)
        {
            _busControl = busControl;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _busControl.StartAsync(cancellationToken);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await _busControl.StopAsync(cancellationToken);
        }
    }

As with Sender project Consumer project Startup also requires some code for Service Bus and MassTransit to work.

Note: Topic name should match between Sender and Consumer.

public void ConfigureServices(IServiceCollection services)
        {
            var connectionString = "Endpoint=sb://servicebustestnetcore.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=blablasharedaccesskey";
            var newPurchaseTopic = "new-purchase-topic"; // need to make sure the topic name is written correctly
            var subscriptionName = "new-purchase-topic-subscriber";

            services.AddMassTransit(serviceCollectionConfigurator =>
            {
                serviceCollectionConfigurator.AddConsumer<PurchaseConsumer>();

                //Consumers - Receivers
                //Message Creators - Senders
                //would normally be in different applications

                serviceCollectionConfigurator.AddBus
                    (registrationContext => Bus.Factory.CreateUsingAzureServiceBus
                                                    (configurator =>
                                                    {
                                                        var host = configurator.Host(connectionString);

                                                        configurator.Message<Purchase>(m => { m.SetEntityName(newPurchaseTopic); });

                                                        /*
                                                         For a consumer to receive messages, the consumer must be connected to a receive endpoint. 
                                                        This is done during bus configuration, particularly within the configuration of a receive endpoint.
                                                        https://masstransit-project.com/usage/consumers.html#consumer*/

                                                        configurator.SubscriptionEndpoint<Purchase>(subscriptionName, endpointConfigurator =>
                                                        {
                                                            endpointConfigurator.ConfigureConsumer<PurchaseConsumer>(registrationContext);
                                                        });

                                                    }
                                                        ));

            });

            //need to always start the bus, so it behaves correctly
            services.AddSingleton<IHostedService, BusHostedService>();
        }

Next we need to follow the same process of creating classes, modifying Startup files for the rest of the Consumer projects.

In my case I’ve got another two to do.

The only difference would be in the value of subscription in the Startup file of the Consumer. I have just added a number in respect to the Consumer project number.

var subscriptionName = "new-purchase-topic-subscriber_1";

Next thing we need to create Azure Service Bus

For this we need to go to Azure Portal home. Select Create a Resource.

On the next screen choose Integration then in the right pane menu Service Bus.

When you are creating Service Bus the things to consider are Resource group, Location and Pricing tier.

For Resource groups either create a new or select the existing one. If it’s part of existing then you can manage it with other resources in that group.

Normally Location should be local to you as different regions/locations have different regulations, latency etc. You can either use the same region or pair them. For example your web app uses UK South, so you would consider using UK South instead of East US for your service bus. If you are pairing then consider using UK South and UK West.

However there are no limits as to what region and location to use. As long as it answers your personal or business requirements. You can read more about regions here and here.

If you consider using different regions then check out this Microsoft article.

As for the tier let’s use the Standard pricing tier since it always works and it’s easier to set up.

Note: Azure charges money for this tier. So make sure you use it accordingly and delete the service bus once you stop playing with it.

Once it’s successfully deployed we need to go and retrieve the connection string for our application. Go to the resource and select the service bus namespace.

On your service bus find Settings and select Shared access policies. Then select RootManageSharedAccessKe and copy Primary Connection String.

We need to paste the whole string into the connectionString variable value in the Startup file of Sender and Consumer projects.

Next let’s test what we created

Since we have Sender and Consumer in the same solution we need to set this solution to run multiple projects. Right click on the solution, select radio button for Multiple startup projects. From the dropdown of Action columns select which projects to run.

To simplify things disable SSL for Sender and Consumer projects. On the Sender project right click and select Properties. In the newly opened tab select Debug and find Enable SSL checkbox. Untick it and save it.

Run the solution.

Because we need to hit HttpPost method we would require some sort of API client tool. For this example let’s try out Postman.

Let’s grab the URL of our Sender project and put it into the Postman tab with HttpPost method type selected. We also need to prefix it with controller route.

http://localhost:58852/api/purchases

It should return Status 200 OK if everything went ok.

Next let’s check our Azure Service Bus on Azure portal

When you hit the Http Post endpoint a topic with provided name will be published. Service bus will create a required amount of subscriptions. Because we configured three consumers the report should show three subscriptions.

Each subscription will be assigned the message that Sender sends.

Because we suspend current thread for a minute the subscription Consumer will process each new message once this time has passed. So if you hit that HttpPost method several times all these messages will be added to a particular topic subscription one by one. However the message will start to process after one minute from the moment it was added.

public Task Consume(ConsumeContext<Order> context)
        {
            System.Threading.Thread.Sleep(60000);//Wait for one minute

            //by returning a completed task service bus removes message from the topic 
            return Task.CompletedTask;
        }

By returning a completed task this message will be removed from the topic subscription.

Before we wrap up we need to tidy things up

This is especially the case because we are using the Standard pricing tier for this Service Bus namespace.

There are several options as to how to do it.

You can delete the Service Bus namespace on the page itself.

Another option is to delete a Resource group altogether.

Conclusion

In this tutorial we covered a basic structure of Azure Service Bus topics. Topics follow a publisher-subscriber pattern you can find more details here.

Main idea to take from here is that you can send the same message to multiple subscribers. You can chain a subscriber with a queue, so that it processes the way you want and it has all the characteristics and tools like a queue.

Source code

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: