MassTransit : caveats
In this article I describe some of the (silly) mistakes that I made while using it and how I fixed them.
Configuration
The following code add consumers to MassTransit and generates 1 endpoint per consumer.
services.AddMassTransit(x =>
{
x.AddConsumer<CheckCentralPresenceConsumer>();
x.AddConsumer<SayHiCentralConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
Note: Each endpoint has its own settings (filters, concurrency, ...)
The following code add consumers to 1 endpoint:
services.AddMassTransit(x =>
{
// This is still needed to register the type in DI
x.AddConsumer<CheckCentralPresenceConsumer>();
x.AddConsumer<SayHiCentralConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(endpoint =>
{
// This filter applies to all consumers
endpoint.UseConsumeFilter(typeof(ThrowFilter<>), context);
// This is needed to bind a consumer to an endpoint
endpoint.ConfigureConsumer<CheckCentralPresenceConsumer>(context);
endpoint.ConfigureConsumer<SayHiCentralConsumer>(context);
});
});
});
Important: either use ReceiveEndpoint
or ConfigureEndpoints
but not both of them. The following code will configure 2 consumers for the same message:
services.AddMassTransit(x =>
{
x.AddConsumer<CheckCentralPresenceConsumer>();
x.AddConsumer<SayHiCentralConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
// This call configures the endpoints once
cfg.ConfigureEndpoints(context);
// The following code adds one more endpoint for the same messages
cfg.ReceiveEndpoint(endpoint =>
{
endpoint.UseConsumeFilter(typeof(ThrowFilter<>), context);
endpoint.ConfigureConsumer<CheckCentralPresenceConsumer>(context);
endpoint.ConfigureConsumer<SayHiCentralConsumer>(context);
});
});
});
Filters
Before trying to add filters in a pipeline, one must understand how a filter applies.
As stated above, filters are configured per endpoint.
Take the following example:
public class ThrowFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
{
public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next)
=> throw new InvalidOperationException("Filter stopped this message");
public void Probe(ProbeContext context) => _ = context.CreateFilterScope("ThrowFilterForTests");
}
This dummy filter throws an exception and does not call next
, hence interrupting the chain.
In the following scenario:
A -request-> B
If the filter is defined in B
endpoint, the consumer of the request would not be reached. In fact, with the following code :
// Client
while(true)
{
await Task.Delay(1000);
try
{
var response = await client.GetResponse<CheckCentralPresenceResponse>(new CheckCentralPresence(counter),
tokenSource.Token,
RequestTimeout.After(s: 10));
Console.WriteLine("OK");
}
catch(RequestFaultException e)
{
Console.WriteLine("Faulted");
}
catch(RequestTimeoutException e)
{
Console.WriteLine("Timeout");
}
}
// Server
public class CheckCentralPresenceConsumer :
IConsumer<CheckCentralPresence>
{
public Task Consume(ConsumeContext<CheckCentralPresence> context)
=> context.RespondAsync(new CheckCentralPresenceResponse());
}
the client would print Timeout
every second in its console, while the server's console would print Faulted
messages because of ThrowFilter
.
Why isn't this code displaying Faulted
instead ?
Not everything is explained in MassTransit documentation !
If an exception occurs in a filter, the first filter of the pipeline (RescueFilter
) will catch it and check if the context is faulted (IsFaulted
).
IsFaulted
can only be set by the ConsumerContext<T>
and since the filter prevented the message to reach the consumer, IsFaulted
is false
. In this situation, RescueFilter
will generate a default Fault
event which does not transport the ConsumerContext<T>
.
Now breathe.
When a request is initiated, MassTransit attaches temporary handlers to the bus, among which a FaultHandler
. The purpose here is to detect a Fault
when it is published and return immediately with a Fault<TMessage>
response. However, FaultHandler
's signature is Task MessageHandler(ConsumeContext<Fault<TRequest>> context)
, meaning that the handler will only catch faults transporting the ConsumeContext
. Since the published fault is generic, this handler is never called and as a fallback, the timeout exception finalizes the exchange.
So if an exception occurs in a Consumer
, I should get a Fault<TMessage>
message immediately ?
Yes. If we take the code above, remove the ThrowFilter
from bus configuration, and edit the consumer like this:
// Server
public class CheckCentralPresenceConsumer :
IConsumer<CheckCentralPresence>
{
public Task Consume(ConsumeContext<CheckCentralPresence> context)
=> throw new InvalidOperationException("No timeout this time !");
}
the context will detect the exception and set IsFaulted
to true
(it actually calls NotifyFaulted<T>
(). See ConsumerMessageFilter.Send
). This time, RescueFilter
will publish a Fault<T>
and the request will return with a RequestFaultException
.
So how can I get the same behavior for an exception occuring in a filter ?
Although discouraged by MassTransit' author, calling NotifyFaulted<T>
from a filter would work:
public class ThrowFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
{
public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next)
{
// the exception would generally be caught and passed in NotifyFaulted but this is for demo only
var exception = new InvalidOperationException("Filter stopped this message");
await context.NotifyFaulted(context, TimeSpan.Zero, typeof(ThrowFilter<>).Name, exception);
throw exception;
}
public void Probe(ProbeContext context) => _ = context.CreateFilterScope("ThrowFilterForTests");
}