Full Pipeline Setup
Overview
This guide combines all four messaging capabilities — internal MediatR idempotency,
transactional outbox staging, background delivery, and RabbitMQ consumption — into a
single, coherent services.AddMessaging() call. Follow this guide for new services or
when you want the complete picture before choosing an incremental approach.
Prerequisites
Either:
- Complete the four individual setup guides (Internal Messaging, Outbox, Delivery, Consumption), OR
- Follow this guide from scratch
NuGet packages
Install all packages from the setup guides:
| Package | Purpose |
|---|---|
| Juice.Messaging | MessagingBuilder, IOutboxService<T>, IMessage hierarchy |
| Juice.Messaging.Local | IMessageService, LocalTransportPublisher, in-process channel dispatch |
| Juice.Messaging.Outbox.EF | Outbox tables, AddOutbox() sub-builder |
| Juice.Messaging.Outbox.Delivery | DeliveryHostedService, AddDelivery() sub-builder |
| Juice.MediatR.Behaviors | IdempotencyRequestBehavior<,>, TransactionBehavior<,,> |
| Juice.MediatR.Contracts | IIdempotentRequest interface |
| Juice.EventBus | IIntegrationEventHandler<T>, IntegrationEventDispatcher |
| Juice.EventBus.RabbitMQ | RabbitMQ transport (producer + consumer) |
| Juice.Messaging.Idempotency.Redis | Redis idempotency backend (recommended) |
| Juice.Messaging.Idempotency.Caching | In-memory / distributed cache backend |
| Juice.Messaging.Idempotency.EF | EF idempotency backend (auditable) |
Sub-Builder Registration Order
The order of sub-builder calls inside AddMessaging() is significant. Register them
in this exact sequence:
AddPublishingPolicies()— must come beforeAddOutbox()because the outbox reads routing policies at message-staging timeAddOutbox()— must come beforeAddDelivery()because the delivery builder reads outbox configuration during its constructionAddIdempotency*()— must come beforeAddDelivery()becauseIntegrationEventDispatcher(created inside delivery) resolvesIIdempotencyServiceat startupAddDelivery()— createsEventBusBuilderinternally via its constructor; thereforeservices.AddEventBus()must not be called separately- RabbitMQ — configured inside
delivery.EventBus.AddRabbitMQ()because theEventBusBuilderis owned byDeliveryBuilder
Complete DI Registration
services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(MyHandler).Assembly);
cfg.AddIdempotencyRequestBehavior();
cfg.AddOpenBehavior(typeof(AppTransactionBehavior<,>)); // subclassed TransactionBehavior
});
services.AddMessaging(builder =>
{
// 1. Publishing policies (needed by outbox at staging time)
builder.AddPublishingPolicies(configuration.GetSection("PublishingPolicies"));
// 2. Outbox (must precede delivery)
builder.AddOutbox(outbox =>
{
outbox.AddOutboxRepository();
outbox.AddDeliveryIntents();
});
// 3. Idempotency backend (before EventBus dispatcher is created)
builder.AddIdempotencyRedis(opts =>
{
opts.Configuration = configuration.GetConnectionString("Redis");
});
// 4. Local transport — IMessageService<TContext> + in-process publishers
builder.AddMessageService<AppDbContext>(); // registers IMessageService<AppDbContext> + local channel
builder.AddLocalPublisher<AppDbContext>(); // registers LocalTransportPublisher keyed "local"
// 5. Delivery (creates EventBusBuilder internally — do not call AddEventBus() separately)
builder.AddDelivery(delivery =>
{
// "local" publisher key for durable in-process dispatch
delivery.AddDeliveryProcessor<AppDbContext>("local",
"send-pending", "retry-failed", "recover-timeout");
// "rabbitmq" publisher key for broker dispatch
delivery.AddDeliveryProcessor<AppDbContext>("rabbitmq",
"send-pending", "retry-failed", "recover-timeout");
delivery.AddDeliveryPolicies(configuration.GetSection("DeliveryPolicies"));
});
// 5. EventBus with infrastructure
builder.AddEventBus()
.AddConsumerServices(consumers =>
{
consumers.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
})
.AddRabbitMQ(rabbitMQ =>
{
// 6. RabbitMQ inside EventBus
rabbitMQ.AddConnection("rabbitmq", configuration.GetSection("RabbitMQ"))
.AddProducer("rabbitmq", "rabbitmq")
.AddConsumer("orders", "orders-queue", "rabbitmq");
});
});
services.AddHealthChecks()
.AddRabbitMQHealthCheck("rabbitmq", tags: new[] { "ready" })
.AddOutboxDeliveryHealthCheck<AppDbContext>(opts =>
{
opts.StuckMessageThresholdMinutes = 15;
opts.MaxStuckMessages = 50;
}, tags: new[] { "live" });Migration from v8.5.0
v8.5.0 → v9 Concept Mapping
| v8.5.0 Concept | Package (v8.5.0) | v9 Replacement | Package (v9) |
|---|---|---|---|
IEventBus interface |
Juice.EventBus |
IOutboxService<T> for staging; IEventBus (CompositeEventPublisher) remains as a publish façade |
Juice.Messaging / Juice.EventBus |
IEventBus.PublishAsync() direct call |
Juice.EventBus |
IOutboxService.AddEventAsync() + SaveEventsAsync() inside transaction |
Juice.Messaging.Outbox.EF |
IntegrationEventLog table (single-table) |
Juice.EventBus.IntegrationEventLog.EF |
OutboxEvent + OutboxDelivery two-table model |
Juice.Messaging.Outbox.EF |
IIntegrationEventLogService |
Juice.EventBus.IntegrationEventLog.EF |
IOutboxService<T> |
Juice.Messaging |
IIntegrationEventLogService.SaveEventAsync() |
— | IOutboxService.AddEventAsync() + SaveEventsAsync(transactionId) |
— |
IIntegrationEventLogService.MarkEventAsInProgressAsync() |
— | Handled automatically by DeliveryHostedService |
— |
IIntegrationEventLogService.MarkEventAsPublishedAsync() |
— | Handled automatically by DeliveryHostedService |
— |
IIntegrationEventLogService.MarkEventAsFailedAsync() |
— | Handled automatically by DeliveryHostedService |
— |
IIntegrationEventService |
Juice.Integrations |
IOutboxService<T> |
Juice.Messaging |
IIntegrationEventService.AddAndSaveEventAsync() |
— | IOutboxService.AddEventAsync() |
— |
IIntegrationEventService.PublishEventsThroughEventBusAsync() |
— | Removed — DeliveryHostedService publishes automatically |
— |
TransactionBehavior (v8, in Juice.Integrations.MediatR) |
Juice.Integrations |
TransactionBehavior<T,R,TContext> (abstract, in Juice.MediatR.Behaviors) |
Juice.MediatR.Behaviors |
services.RegisterRabbitMQEventBus() |
Juice.EventBus.RabbitMQ |
builder.AddMessaging().AddDelivery(d => d.EventBus.AddRabbitMQ(...)) |
Juice.Messaging + Juice.EventBus.RabbitMQ |
IRequestManager (command deduplication) |
Juice.MediatR |
IIdempotentRequest + IdempotencyBehavior |
Juice.MediatR.Contracts + Juice.MediatR.Behaviors |
IIdentifiedCommand<T> wrapper |
Juice.MediatR |
Removed — mark command directly with IIdempotentRequest |
Juice.MediatR.Contracts |
requestManager.ExistAsync(id) in handler |
— | Removed — IdempotencyBehavior handles it automatically |
— |
IntegrationEvent record (v8) |
Juice.EventBus |
IntegrationEvent record still exists; now extends MessageBase (carries MessageId, CreatedAt, TenantId) |
Juice.EventBus.Contracts |
IIntegrationEventHandler<T>.HandleAsync() |
Juice.EventBus |
Unchanged | Juice.EventBus |
RabbitMQ SubscriptionClientName config |
Juice.EventBus.RabbitMQ |
AddConsumer(key, queueName, connectionName) |
Juice.EventBus.RabbitMQ |
| N/A (new in v9) | — | IMessageService / IMessageService<TContext> — unified publishing interface with config-driven routing to "local-channel", "local", or broker |
Juice.Messaging.Local |
| N/A (new in v9) | — | LocalTransportPublisher — outbox-backed in-process delivery with immediate dispatch + idempotency deduplication |
Juice.Messaging.Local |
What Is Unchanged
These v8.5.0 concepts carry forward to v9 without replacement:
IntegrationEventabstract record (still the base for integration events)IIntegrationEventHandler<T>interface (handlers unchanged)IIntegrationEventinterface (still required on the consuming side)RabbitMQ.Clientas the broker transportFinbuckle.MultiTenantfor tenant resolution- MediatR
IRequest<T>,INotificationpatterns (MediatR itself unchanged) IAggregrateRoot<TNotification>domain patternIUnitOfWorkinterface
See also
- Internal Messaging Setup — MediatR +
IdempotencyBehaviorstandalone - Outbox Setup — transactional outbox standalone
- Local Transport — in-process dispatch with
IMessageService - Delivery Setup — delivery worker standalone
- Consumption Setup — RabbitMQ consumer standalone
- Event bus v8.5.0 archive — original
IEventBusandIntegrationEventLogdocs - MediatR v8.5.0 archive — original
IRequestManagerdocs - Integration service v8.5.0 archive — original
IIntegrationEventServicedocs