In the IntegrationEventLog section, we talked about atomicity and resiliency when publishing to the event bus, followed by each step of the process. In this section, we will package the steps of publishing events and updating event states into IIntegrationEventService service, and then combine them with MediatR behavior for transaction processing.
public interface IIntegrationEventService
{
Task PublishEventsThroughEventBusAsync(Guid transactionId);
Task AddAndSaveEventAsync(IntegrationEvent evt);
}
public interface IIntegrationEventService<out TContext> : IIntegrationEventService
where TContext : DbContext, IUnitOfWork
{
TContext DomainContext { get; }
/// <summary>
/// Use specified transaction when working with transient DbContext
/// </summary>
/// <param name="evt"></param>
/// <param name="transaction"></param>
/// <returns></returns>
Task AddAndSaveEventAsync(IntegrationEvent evt, IDbContextTransaction transaction);
}
Now the process steps will look like this:
...
using Juice.Integrations.EventBus;
...
var context = scope.ServiceProvider.GetRequiredService<DomainDbContext>();
var integrationEventService = scope.ServiceProvider.GetRequiredService<IIntegrationEventService<DomainDbContext>>();
// do something with your DomainDbContext
context.Add(new Content());
// create an integration event
var evt = new ContentPublishedIntegrationEvent($"Content {content.Code} was published.");
// save DomainDbContext changes and add an integration event at the same time, in the same transaction
await ResilientTransaction.New(context).ExecuteAsync(async (transaction) =>
{
// Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
await context.SaveChangesAsync();
await integrationEventService.AddAndSaveEventAsync(evt);
// commit transaction
await context.CommitTransactionAsync(transaction);
// if everything is ok then you can publish created integration event throw service bus
await integrationEventService.PublishEventsThroughEventBusAsync(transaction.TransactionId);
});
NOTE: to use this pattern, your DomainDbContext must implement IUnitOfWork interface.
The library can be accessed via Nuget:
One more step to streamline the process in the section above when integrating with MediatR. We have implemented an abstraction class to handle the transaction step by step:
You need to implement TransactionBehavior<TRequest, TResponse, TContext> for your request and DbContext like this:
using Juice.Integrations.EventBus;
using Juice.Integrations.MediatR.Behaviors;
...
internal class TimerTransactionBehavior<T, R> : TransactionBehavior<T, R, TimerDbContext>
where T : IRequest<R>
{
public TimerTransactionBehavior(TimerDbContext dbContext, IIntegrationEventService<TimerDbContext> integrationEventService, ILogger<TimerTransactionBehavior<T, R>> logger) : base(dbContext, integrationEventService, logger)
{
}
}
Then register as a MediatR behavior
services.AddScoped(typeof(IPipelineBehavior<CreateTimerCommand, TimerRequest>),
typeof(TimerTransactionBehavior<CreateTimerCommand, TimerRequest>));
To understand this section, please read more about using UnitOfWork and MediatR.
The library can be accessed via Nuget: