Full Pipeline Setup
Overview
This guide combines all four messaging capabilities — internal MediatR idempotency,
transactional outbox staging, background delivery, and RabbitMQ consumption — into a
single, coherent services.AddMessaging() call. Follow this guide for new services or
when you want the complete picture before choosing an incremental approach.
Prerequisites
Either:
- Complete the four individual setup guides (Internal Messaging, Outbox, Delivery, Consumption), OR
- Follow this guide from scratch
NuGet packages
Install all packages from the four setup guides:
| Package | Purpose |
|---|---|
| Juice.Messaging | MessagingBuilder, IOutboxService<T>, IMessage hierarchy |
| Juice.Messaging.Outbox.EF | Outbox tables, AddOutbox() sub-builder |
| Juice.Messaging.Outbox.Delivery | DeliveryHostedService, AddDelivery() sub-builder |
| Juice.MediatR.Behaviors | IdempotencyRequestBehavior<,>, TransactionBehavior<,,> |
| Juice.MediatR.Contracts | IIdempotentRequest interface |
| Juice.EventBus | IIntegrationEventHandler<T>, IntegrationEventDispatcher |
| Juice.EventBus.RabbitMQ | RabbitMQ transport (producer + consumer) |
| Juice.Messaging.Idempotency.Redis | Redis idempotency backend (recommended) |
| Juice.Messaging.Idempotency.Caching | In-memory / distributed cache backend |
| Juice.Messaging.Idempotency.EF | EF idempotency backend (auditable) |
Sub-Builder Registration Order
The order of sub-builder calls inside AddMessaging() is significant. Register them
in this exact sequence:
AddPublishingPolicies()— must come beforeAddOutbox()because the outbox reads routing policies at message-staging timeAddOutbox()— must come beforeAddDelivery()because the delivery builder reads outbox configuration during its constructionAddIdempotency*()— must come beforeAddDelivery()becauseIntegrationEventDispatcher(created inside delivery) resolvesIIdempotencyServiceat startupAddDelivery()— createsEventBusBuilderinternally via its constructor; thereforeservices.AddEventBus()must not be called separately- RabbitMQ — configured inside
delivery.EventBus.AddRabbitMQ()because theEventBusBuilderis owned byDeliveryBuilder
Complete DI Registration
services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(MyHandler).Assembly);
cfg.AddIdempotencyRequestBehavior();
cfg.AddOpenBehavior(typeof(AppTransactionBehavior<,>)); // subclassed TransactionBehavior
});
services.AddMessaging(builder =>
{
// 1. Publishing policies (needed by outbox at staging time)
builder.AddPublishingPolicies(configuration.GetSection("PublishingPolicies"));
// 2. Outbox (must precede delivery)
builder.AddOutbox(outbox =>
{
outbox.AddOutboxRepository();
outbox.AddDeliveryIntents();
});
// 3. Idempotency backend (before EventBus dispatcher is created)
builder.AddIdempotencyRedis(opts =>
{
opts.Configuration = configuration.GetConnectionString("Redis");
});
// 4. Delivery (creates EventBusBuilder internally — do not call AddEventBus() separately)
builder.AddDelivery(delivery =>
{
delivery.AddDeliveryProcessor<AppDbContext>("rabbitmq",
"send-pending", "retry-failed", "recover-timeout");
delivery.AddDeliveryPolicies(configuration.GetSection("DeliveryPolicies"));
});
// 5. EventBus with infrastructure
builder.AddEventBus()
.AddConsumerServices(consumers =>
{
consumers.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
})
.AddRabbitMQ(rabbitMQ =>
{
// 6. RabbitMQ inside EventBus
rabbitMQ.AddConnection("rabbitmq", configuration.GetSection("RabbitMQ"))
.AddProducer("rabbitmq", "rabbitmq")
.AddConsumer("orders", "orders-queue", "rabbitmq");
});
});
services.AddHealthChecks()
.AddRabbitMQHealthCheck("rabbitmq", tags: new[] { "ready" })
.AddOutboxDeliveryHealthCheck<AppDbContext>(opts =>
{
opts.StuckMessageThresholdMinutes = 15;
opts.MaxStuckMessages = 50;
}, tags: new[] { "live" });Migration from v8.5.0
v8.5.0 → v9 Concept Mapping
| v8.5.0 Concept | Package (v8.5.0) | v9 Replacement | Package (v9) |
|---|---|---|---|
IEventBus interface | Juice.EventBus | IOutboxService<T> for staging; IEventBus (CompositeEventPublisher) remains as a publish façade | Juice.Messaging / Juice.EventBus |
IEventBus.PublishAsync() direct call | Juice.EventBus | IOutboxService.AddEventAsync() + SaveEventsAsync() inside transaction | Juice.Messaging.Outbox.EF |
IntegrationEventLog table (single-table) | Juice.EventBus.IntegrationEventLog.EF | OutboxEvent + OutboxDelivery two-table model | Juice.Messaging.Outbox.EF |
IIntegrationEventLogService | Juice.EventBus.IntegrationEventLog.EF | IOutboxService<T> | Juice.Messaging |
IIntegrationEventLogService.SaveEventAsync() | — | IOutboxService.AddEventAsync() + SaveEventsAsync(transactionId) | — |
IIntegrationEventLogService.MarkEventAsInProgressAsync() | — | Handled automatically by DeliveryHostedService | — |
IIntegrationEventLogService.MarkEventAsPublishedAsync() | — | Handled automatically by DeliveryHostedService | — |
IIntegrationEventLogService.MarkEventAsFailedAsync() | — | Handled automatically by DeliveryHostedService | — |
IIntegrationEventService | Juice.Integrations | IOutboxService<T> | Juice.Messaging |
IIntegrationEventService.AddAndSaveEventAsync() | — | IOutboxService.AddEventAsync() | — |
IIntegrationEventService.PublishEventsThroughEventBusAsync() | — | Removed — DeliveryHostedService publishes automatically | — |
TransactionBehavior (v8, in Juice.Integrations.MediatR) | Juice.Integrations | TransactionBehavior<T,R,TContext> (abstract, in Juice.MediatR.Behaviors) | Juice.MediatR.Behaviors |
services.RegisterRabbitMQEventBus() | Juice.EventBus.RabbitMQ | builder.AddMessaging().AddDelivery(d => d.EventBus.AddRabbitMQ(...)) | Juice.Messaging + Juice.EventBus.RabbitMQ |
IRequestManager (command deduplication) | Juice.MediatR | IIdempotentRequest + IdempotencyBehavior | Juice.MediatR.Contracts + Juice.MediatR.Behaviors |
IIdentifiedCommand<T> wrapper | Juice.MediatR | Removed — mark command directly with IIdempotentRequest | Juice.MediatR.Contracts |
requestManager.ExistAsync(id) in handler | — | Removed — IdempotencyBehavior handles it automatically | — |
IntegrationEvent record (v8) | Juice.EventBus | IntegrationEvent record still exists; now extends MessageBase (carries MessageId, CreatedAt, TenantId) | Juice.EventBus.Contracts |
IIntegrationEventHandler<T>.HandleAsync() | Juice.EventBus | Unchanged | Juice.EventBus |
RabbitMQ SubscriptionClientName config | Juice.EventBus.RabbitMQ | AddConsumer(key, queueName, connectionName) | Juice.EventBus.RabbitMQ |
What Is Unchanged
These v8.5.0 concepts carry forward to v9 without replacement:
IntegrationEventabstract record (still the base for integration events)IIntegrationEventHandler<T>interface (handlers unchanged)IIntegrationEventinterface (still required on the consuming side)RabbitMQ.Clientas the broker transportFinbuckle.MultiTenantfor tenant resolution- MediatR
IRequest<T>,INotificationpatterns (MediatR itself unchanged) IAggregrateRoot<TNotification>domain patternIUnitOfWorkinterface
See also
- Internal Messaging Setup — MediatR +
IdempotencyBehaviorstandalone - Outbox Setup — transactional outbox standalone
- Delivery Setup — delivery worker standalone
- Consumption Setup — RabbitMQ consumer standalone
- Event bus v8.5.0 archive — original
IEventBusandIntegrationEventLogdocs - MediatR v8.5.0 archive — original
IRequestManagerdocs - Integration service v8.5.0 archive — original
IIntegrationEventServicedocs