MassTransit : caveats

MassTransit : caveats

In this article I describe some of the (silly) mistakes that I made while using it and how I fixed them.

Configuration

The following code add consumers to MassTransit and generates 1 endpoint per consumer.

services.AddMassTransit(x =>
    {
        x.AddConsumer<CheckCentralPresenceConsumer>();
        x.AddConsumer<SayHiCentralConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("localhost", "/", h =>
            {
                h.Username("guest");
                h.Password("guest");
            });
            cfg.ConfigureEndpoints(context);
        });
    });

Note: Each endpoint has its own settings (filters, concurrency, ...)

The following code add consumers to 1 endpoint:

services.AddMassTransit(x =>
    {
        // This is still needed to register the type in DI
        x.AddConsumer<CheckCentralPresenceConsumer>();
        x.AddConsumer<SayHiCentralConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("localhost", "/", h =>
            {
                h.Username("guest");
                h.Password("guest");
            });
            cfg.ReceiveEndpoint(endpoint =>
            {
                // This filter applies to all consumers
                endpoint.UseConsumeFilter(typeof(ThrowFilter<>), context);
                // This is needed to bind a consumer to an endpoint
                endpoint.ConfigureConsumer<CheckCentralPresenceConsumer>(context);
                endpoint.ConfigureConsumer<SayHiCentralConsumer>(context);
            });
        });
    });

Important: either use ReceiveEndpoint or ConfigureEndpoints but not both of them. The following code will configure 2 consumers for the same message:

services.AddMassTransit(x =>
    {
        x.AddConsumer<CheckCentralPresenceConsumer>();
        x.AddConsumer<SayHiCentralConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("localhost", "/", h =>
            {
                h.Username("guest");
                h.Password("guest");
            });
            // This call configures the endpoints once
            cfg.ConfigureEndpoints(context);
            // The following code adds one more endpoint for the same messages
            cfg.ReceiveEndpoint(endpoint =>
            {
                endpoint.UseConsumeFilter(typeof(ThrowFilter<>), context);
                endpoint.ConfigureConsumer<CheckCentralPresenceConsumer>(context);
                endpoint.ConfigureConsumer<SayHiCentralConsumer>(context);
            });
        });
    });

Filters

Before trying to add filters in a pipeline, one must understand how a filter applies.

As stated above, filters are configured per endpoint.

Take the following example:

public class ThrowFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
{
    public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next) 
        => throw new InvalidOperationException("Filter stopped this message");

    public void Probe(ProbeContext context) => _ = context.CreateFilterScope("ThrowFilterForTests");
}

This dummy filter throws an exception and does not call next, hence interrupting the chain.

In the following scenario:

A -request-> B

If the filter is defined in B endpoint, the consumer of the request would not be reached. In fact, with the following code :

// Client
while(true)
{
  await Task.Delay(1000);
  try
  {
  var response = await client.GetResponse<CheckCentralPresenceResponse>(new CheckCentralPresence(counter),
                        tokenSource.Token,
                        RequestTimeout.After(s: 10));
   Console.WriteLine("OK");
  }
  catch(RequestFaultException e)
  {
     Console.WriteLine("Faulted");
  }
  catch(RequestTimeoutException e)
  {
     Console.WriteLine("Timeout");
  }
}

// Server
public class CheckCentralPresenceConsumer :
        IConsumer<CheckCentralPresence>
{
    public Task Consume(ConsumeContext<CheckCentralPresence> context)
        => context.RespondAsync(new CheckCentralPresenceResponse());
}

the client would print Timeout every second in its console, while the server's console would print Faulted messages because of ThrowFilter.

Why isn't this code displaying Faulted instead ?

Not everything is explained in MassTransit documentation !

If an exception occurs in a filter, the first filter of the pipeline (RescueFilter) will catch it and check if the context is faulted (IsFaulted).

IsFaulted can only be set by the ConsumerContext<T> and since the filter prevented the message to reach the consumer, IsFaulted is false. In this situation, RescueFilter will generate a default Fault event which does not transport the ConsumerContext<T>.

Now breathe.

When a request is initiated, MassTransit attaches temporary handlers to the bus, among which a FaultHandler. The purpose here is to detect a Fault when it is published and return immediately with a Fault<TMessage> response. However, FaultHandler's signature is Task MessageHandler(ConsumeContext<Fault<TRequest>> context), meaning that the handler will only catch faults transporting the ConsumeContext. Since the published fault is generic, this handler is never called and as a fallback, the timeout exception finalizes the exchange.

So if an exception occurs in a Consumer, I should get a Fault<TMessage> message immediately ?

Yes. If we take the code above, remove the ThrowFilter from bus configuration, and edit the consumer like this:

// Server
public class CheckCentralPresenceConsumer :
        IConsumer<CheckCentralPresence>
{
    public Task Consume(ConsumeContext<CheckCentralPresence> context)
        => throw new InvalidOperationException("No timeout this time !");
}

the context will detect the exception and set IsFaulted to true (it actually calls NotifyFaulted<T>(). See ConsumerMessageFilter.Send). This time, RescueFilter will publish a Fault<T> and the request will return with a RequestFaultException.

So how can I get the same behavior for an exception occuring in a filter ?

Although discouraged by MassTransit' author, calling NotifyFaulted<T> from a filter would work:

public class ThrowFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
{
    public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next) 
    {
        // the exception would generally be caught and passed in NotifyFaulted but this is for demo only
        var exception = new InvalidOperationException("Filter stopped this message");
        await context.NotifyFaulted(context, TimeSpan.Zero, typeof(ThrowFilter<>).Name, exception);
        throw exception;
    }

    public void Probe(ProbeContext context) => _ = context.CreateFilterScope("ThrowFilterForTests");
}