MassTransit

MassTransit

Here are some personal notes taken when I first integrated MassTransit in a project.

Configuration

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        // Adds Bus interfaces and outgoing Endpoint interfaces (IPublishedEndpoint, ISendEndpointProvider)
        services.AddMassTransit(x =>
        {
            // In this example, MassTransit gets configured to use RMQ, no receiver is configured
            x.UsingRabbitMq();
            ------
            // In this example, a received endpoint is configured as well
            x.AddConsumer<EventConsumer>();
            x.UsingRabbitMq((context, cfg) =>
            {
                // Here we add a consumer and attach a receive endpoint name to it. There is a simpler solution, check further below
                cfg.ReceiveEndpoint("event-listener", e =>
                {
                    e.ConfigureConsumer<EventConsumer>(context);
                });
            });
            -----
            // In this example, we also declare a receiver but with auto-naming (kebab-case :-) )
            // Using the ConfigureEndpoints method will automatically create a receive endpoint for every added consumer, saga, and activity. 
            x.AddConsumer<ValueEnteredEventConsumer>();
            x.SetKebabCaseEndpointNameFormatter();
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ConfigureEndpoints(context);
            });
        });
        // Declares MassTransit as an IHostedService to automatically start/stop. 
        // Besides, this service aéso procude health check signals that can be monitored if mapped to some end-points
        // in the middleware configuration section. See below
        services.AddMassTransitHostedService();
        -----
    }

    // Middleware configuration
    public void Configure(IApplicationBuilder app)
        {
            app.UseEndpoints(endpoints =>
            {
                // Map MassTransit health check signals to endpoints
                endpoints.MapHealthChecks("/health/ready", new HealthCheckOptions()
                {
                    Predicate = (check) => check.Tags.Contains("ready"),
                });
                endpoints.MapHealthChecks("/health/live", new HealthCheckOptions());
            });
        }
}

and can be used as shown below to publish messages

readonly IPublishEndpoint _publishEndpoint;

// Constructor gets injected with a publisher
public MyModule(IPublishEndpoint publishEndpoint)
{
    _publishEndpoint = publishEndpoint;
}

public async Task DoSomethingAsync(string value)
{
    // Publishing
    await _publishEndpoint.Publish<ValueEntered>(new
    {
        Value = value
    });
}

or like this to receive messages

// A basic receiver is called a Consumer with MassTransit and must inherit IConsumer<T>
class EventConsumer : IConsumer<ValueEntered>
{
    ILogger<EventConsumer> _logger;

    public EventConsumer(ILogger<EventConsumer> logger)
    {
        _logger = logger;
    }

    // Handler enforced by IConsumer<T>
    public async Task Consume(ConsumeContext<ValueEntered> context)
    {
        _logger.LogInformation("Value: {Value}", context.Message.Value);
    }
}

Advantages of using MassTransit

The benefits of using MassTransit instead of the raw transport APIs and building everything from scratch, are shown below.

Benefit Description
Concurrency MassTransit is completely asynchronous and leverages the .NET Task Parallel Library (TPL) to consume messages concurrently to achieve maximum throughput and high server utilization
Connection management The network is unreliable. If the application is disconnected from the message broker, MassTransit takes care of reconnecting and making sure all of the exchanges, queues, and bindings are restored
Exception, retries, and poison messages Your message consumers don't need to know about broker acknowledgement protocols. If your message consumer runs to completion, the message is acknowledged and removed from the queue. If you throw an exception, MassTransit uses a retry policy to redeliver the message to the consumer. If the retries are exhausted due to continued failures or other reasons, MassTransit moves the message to an error queue. If the message did not reach a consumer due to being misrouted to the queue, the message is moved to a skipped queue.
Serialization Provides many serializers out of the box, including JSON, BSON, and XML as well as the .NET binary formatter as a last resort. You can even protect your messages using AES-256 encryption.
Message header and correlation The envelope includes headers for tracking messages, including conversations, correlations, and requests.
Consumer lifecycle management MassTransit handles consumer creation and disposal as part of the message consumption pipeline
Routing CPU and memory friendly message routing
Easy Unit Testing High-performance in-memory transport for testing every consumer. MassTransit.TestFramework NuGet package includes test harnesses that handle the setup and teardown of the bus so you can easily test your message consumers and sagas.
Sagas Sagas are a powerful abstraction that supports message orchestration with durable state. You can build highly available distributed workflow and coordination services easily
Scheduling MassTransit makes it easy to schedule messages for future deliver
Monitoring MassTransit updates a range of performance counters as messages are processed so operations can keep an eye on message flow

Transports

  • RabbitMQ : most popular open source message broker. Lightweight & easy to deploy
  • Azure Service bus
  • ActiveMQ : open source, multi-protocol, Java-based message broker
  • gRPC :  peer-to-peer distributed non-durable message transport. Very fast
  • InMemory : greate for testing, no message broker needed, very fast

Messages

MassTransit advises to use interfaces to code messages, with readonly members:

// The entire namesapce is checked when dispatching
namespace Company.Application.Contracts
{
    using System;

    public interface UpdateCustomerAddress
    {
        Guid CommandId { get; }
        DateTime Timestamp { get; }
        string CustomerId { get; }
        string HouseNumber { get; }
        string Street { get; }
        string City { get; }
        string State { get; }
        string PostalCode { get; }
    }
}

MassTransit will create dynamic interface implementations for the messages, ensuring a clean separation of the message contract from the consumer.
Additionnally, MassTransit warns against message base classes. Below is a typical example of how a message would be sent with MassTransit:

public interface SubmitOrder
{
    string OrderId { get; }
    DateTime OrderDate { get; }
    decimal OrderAmount { get; }
}

await endpoint.Send<SubmitOrder>(
    // Anonymous object
    new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
        OrderAmount = 123.45m
    });

The anonymous object is loosely typed, the properties are matched by name, and there is an extensive set of type conversions that may occur to match the types defined by the interface. Most numeric, string, and date/time conversions are supported, as well as several advanced conversions (including variables, and asynchronous `Task results).

Collections, including arrays, lists, and dictionaries, are broadly supported, including the conversion of list elements, as well as dictionary keys and values. For instance, a dictionary of (int,decimal) could be converted on the fly to (long, string) using the default format conversions.

Nested objects are also supported, for instance, if a property was of type Address and another anonymous object was created (or any type whose property names match the names of the properties on the message contract), those properties would be set on the message contract.

Message headers

All messages are encapsulated into an envelopped containing already many fields (initiator id, correlation id, ...). In some rare cases, these fields could be overwritten if necessary. Besides, a special syntax allows more fields to be added to the header from the anonymous object declaration.

Message types

Type Description
Command Tells a service to do something. Commands are sent with Send. Naming convention : verb-noun sequence (ex: UpdateCustomerAddress)
Event Signifies that something has happened. Events are sent with Publish and not directed to an endpoint in particular. Namong convention : noun-verb, past tense, sequence (ex: CustomerAddressUpdated)

Consumers

Several types of consumers:

  • Consumer
  • Sagas
  • Saga state machines
  • Routing slip activities
  • Handlers
  • Job consumers

Common Consumer use case

The simplest and most common consumer inherits IConsumer<T>. When a sender posts a message to the broker, a consumer is instanciated on receiver side and its Consume method implementation will be called. A ConsumeContext object is passed which contains the message but also the enveloppe and an access to the bus (to publish an event for instance). Consume returns a Task awaited by MassTransit:

  • If successful : the message is acknowledged and removed from the queue
  • If faulty : consumer is released and exception is propagated back up the pipeline. If the exception does not trigger a retry, the message is moved to an error queue

Consumer factories

cfg.ReceiveEndpoint("order-service", e =>
{
    // delegate consumer factory
    e.Consumer(() => new SubmitOrderConsumer());

    // another delegate consumer factory, with dependency
    e.Consumer(() => new LogOrderSubmittedConsumer(Console.Out));

    // a type-based factory that returns an object (specialized uses)
    var consumerType = typeof(SubmitOrderConsumer);
    e.Consumer(consumerType, type => Activator.CreateInstance(consumerType));
});

Transient vs Instance

Creating a new consumer instance for each message is highly suggested. However, it is possible to configure an existing consumer instance to which every received message will be delivered (thread safety must be taken in account here for the consumer since it will get called for all messages adressed to order-service).

var submitOrderConsumer = new SubmitOrderConsumer();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint("order-service", e =>
    {
        e.Instance(submitOrderConsumer);
    });
});

Consumer vs Handler

Prefer creating Consumers to consume messages. If not possible, handlers are supported too:

cfg.ReceiveEndpoint("order-service", e =>
{
    // Lambda handler
    e.Handler<SubmitOrder>(async context =>
    {
        await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
    });
});

Producer

A message can be sent or a message can be published.
When a message is sent, it is delivered to a specific endpoint using a DestinationAddress.
Below are the interfaces to be used to produce a message.

Interface When/Where
ConsumeContext From a consumer
ISendEndpointProvider To send a message from any class. Typically passed to constructors. Do not store the ISendEndpoint object
IBus Only for messages that are sent by an initiator

Send

private readonly string _serviceAddress = "rabbitmq://localhost/input-queue";
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);

    await endpoint.Send(new SubmitOrder { OrderId = "123" });
    
    // do not save endpoint
}

Adresses

An endpoint address can be a fully qualified URI which may include transport-specific details but since MassTransit v6, short addresses are supported as well :

// Get the endpoint for the queue input-queue, whatever is the transport layer 
GetSendEndpoint(new Uri("queue:input-queue"))

Note: Unfortunately, not all transport layer support the same set of short addresses. Be sure to check official documentation first.

But if each endpoint deals with a subset of messages, and that no message is destined to more than one endpoint, then addresses can be mapped in configuration so that they are never required by peers at runtime :

EndpointConvention.Map<SubmitOrder>(new Uri("rabbitmq://mq.acme.com/order/order_processing"));

and then in peers :

// a typical producer
await _bus.Send(new SubmitOrder { OrderId = "123" });
// a typical consumer re-publishing a message
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));

Note: if the same message is mapped twice with EndpointConvention.Map an exception will be thrown.

Publish

Messages are published similarly to how messages are sent, but in this case, a single IPublishEndpoint is used. The same rules for endpoints apply.

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Exceptions

public Task Consume(ConsumeContext<SubmitOrder> context)
{
    throw new Exception("Very bad things happened");
}

With this code, the exception is caught by middleware in the transport (ErrorTransportFilter) and the message is moved to an _error queue (prefixed with the received endpoint queue name). Exception details are stored in headers with the message for analysis.

Retries

Retries can be configured for each endpoint:

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseMessageRetry(r => r.Immediate(5));
    // ...
});

This line adds a RetryFilterto the middleware. All retry policies are available here.

Conditional retries can be configured as well with Handleand Ignore:

e.UseMessageRetry(r => 
{
    // Only retry with ArgumentNullException or other exception types not listed here
    r.Handle<ArgumentNullException>();
    // Ignore these
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

A conditional retry can be configured for a particular endpoint:

e.Consumer<MyConsumer>(c => c.UseMessageRetry(r => 
{
    r.Interval(10, TimeSpan.FromMilliseconds(200));
    r.Ignore<ArgumentNullException>();
    r.Ignore<DataException>(x => x.Message.Contains("SQL"));
});
);

Note: With a scheduler configured, MassTransit can also handle deliveries at a much later time.

Requests

As opposed to Commands, Requests are sychronously executed. Each Request expects a Response back from the Consumer.

// From caller
public async Task<OrderStatusResult> Get(IRequestClient<CheckOrderStatus> client)
{
    return await client.GetResponse<OrderStatusResult>(new {OrderId = id});
}

// From consumer
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    
    // Send a response back
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

Note 1: In case of an exception thrown by the Consumer, MassTransit will return a Fault<CheckOrderStatus>.
Note 2: A request can be interrupted before completion with the cancellation token.

Configuration

One of the differences with regular publishers is that a requester needs to be registered at configuration time.

services.AddMassTransit(x =>
    {
        x.AddConsumer<CheckOrderStatusConsumer>();

        // ...

        x.AddRequestClient<CheckOrderStatus>();
    })

This registration will allow injection of IRequestClient<CheckOrderStatus> in the application.
Optionnally, multiple clients can be added as once:

services.AddMassTransit(x =>
    {
        // ...

        x.AddGenericRequestClient();
    });

Headers

Just like the other types of payloads, the header of a Request can be edited and more fields can be created.

Standalone request

A reauest client can be created on the fly if necessary (but do not do this from a Consumer or an API controller):

var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);

Sagas

A saga is a long-lived transaction managed by a coordinator. Sagas are initiated by an event, sagas orchestrate events, and sagas maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs.

Sagas are built upon an open source machine state library called Automatonymous. MassTransit extends the state machine capabilities with instance storage, event correlation, message binding, req/response and scheduling.

Workflow

The workflow is quite complex to understand, mostly because of the vocabulary. A saga is composed of:

  • A state machine : this is where the transitions are defined (Behaviors) and where the states can be defined
  • A state context : In MassTransit examles, this is called a state but is actually a state context (or model). It contains transactional data and stores the actual state of the transaction
  • States : a states is nothing more than an identifier. Could be int, string...
  • Events : an event is a message to which the state machine has subscribed. The event triggers the behaviors of the state machine
  • A correlation id : could be of any type. This is what ties all the elements listed above together. It's like a pass used to validate the correlation of an event with the state machine

The state machine comes with 3 default states out-of-the box:

  • None
  • Initial
  • Final (reached when saga is complete)

additional states can be defined as shown further below.

Storage

The sagas needs to be persisted in a repository (could be InMemory or in ay type of database).

Configuration

// Adds Bus interfaces and outgoing Endpoint interfaces (IPublishedEndpoint, ISendEndpointProvider)
services.AddMassTransit(x =>
{
    // ...
    x.ReceiveEndpoint("order", e =>
    {
        // Example with the InMemory storage
        e.StateMachineSaga(new OrderStateMachine(), new InMemorySagaRepository<OrderState>());
    });
    // ...
}

Concrete example

// This is the state model. It is what the state machine will revolve around
public class OrderStateContext : SagaStateMachineInstance
{
    // Here the correlation id is a guid (most common case). It could be anything
    public Guid CorrelationId {get; set;}
    // We are storing the current machine state in the context. It is a string here but could be int as well
    // typically to use less storage memory
    public string CurrentState { get; set; }
    
    // and we can store any complementary data here ...
    public DateTime Updated { get; set; }
    public DateTime SubmitDate { get; set; }
}

// Here are some examples of events: They are regular MassTransit payloads
public interface OrderSubmitted
{
    Guid OrderId { get; }
    DateTime Timestamp { get; }
}
public interface CheckOrder
{
    Guid OrderId { get; }
}
public interface OrderStatus
{
    Guid OrderId { get; }
    string State { get; }
}
public interface OrderCompleted
{
    Guid OrderId { get; }
}
// Finally the state machine
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    // Additional states can be define just by listing them as State properties. (there is an auto conversion to string)
    // Note : If CurrentState was an int, the InstanceState declaration in the constructor would be slightly different. Check docs
    public State Submitted { get; set; }

    // Subscribed events need to be listed here too, in the form of public Event properties
    public Event<OrderSubmitted> OrderSubmitted { get; private set; }
    public Event<CheckOrder> OrderStatusRequested { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }
    
    public OrderStateMachine()
    {
        // This is where the behaviors are defined. But first, we need to tie the others saga components to this state machine
        // Events correlation
        Event(() => OrderSubmitted, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => OrderStatusRequested, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => OrderCompleted, x => x.CorrelateById(m => m.Message.OrderId));

        // State instance location in the context
        InstanceState(x => x.CurrentState);

        // Behaviors
        // From init state (could also be written During(Initial, ...))
        Initially(
            When(OrderSubmitted)
            .Then(context => 
            {
                context.Instance.SubmitDate = context.Data.Timestamp;
                context.Instance.Updated = DateTime.UtcNow;
            })
            .TransitionTo(Submitted));

        // We can also disable a behavior once it has been treated to avoid workflow exceptions
        During(Submitted, Ignore(OrderSubmitted));
        
        // In this example, we respond back to a Request and there is no state transition
        DuringAny(
            When(OrderStatusRequested)
            .ResponseAsync(x => x.Init<OrderStatus>(new
            {
                OrderId = x.Instance.CorrelationId,
                State = x.Instance.CurrentState
            })));

        // Finaly, because the saga is not removed from repository as long as it has not been completed, we need to define this condition here.
        // Below is a simple approach to do so, but this could be also done with SetCompleted(x =>...) statement. Check docs
        // Here we want to consider the saga complete as soon as it enters the Final state
        DuringAny(
            When(OrderCompleted)
                .Finalize());
        // This statement is necessary to bind Final state with completion routine which will remove the saga instance from repository
        SetCompletedWhenFinalized();
    }
}

Now with this state machine, here is what's going to happen:

client                               state machine
    |                                 |
    X ------- OrderSubmitted -------> X ---
    |                                 |   | Transition to Submitted
    |                                 | <--
    |                                 | 
    X --------- CheckOrder ---------> X ---
    |                                 |   | Submitted
    X <-------- OrderStatus --------- | <--
    |                                 | 
    X ------- OrderCompleted -------> X ---
    |                                 |   | Transition to Final
    |                                 | <--
    |                                 X ---
    |                                 |   | Completion
    |                                 | <--
    |                                ---