Full Pipeline Setup

Overview

This guide combines all four messaging capabilities — internal MediatR idempotency, transactional outbox staging, background delivery, and RabbitMQ consumption — into a single, coherent services.AddMessaging() call. Follow this guide for new services or when you want the complete picture before choosing an incremental approach.


Prerequisites

Either:


NuGet packages

Install all packages from the four setup guides:

PackagePurpose
Juice.MessagingMessagingBuilder, IOutboxService<T>, IMessage hierarchy
Juice.Messaging.Outbox.EFOutbox tables, AddOutbox() sub-builder
Juice.Messaging.Outbox.DeliveryDeliveryHostedService, AddDelivery() sub-builder
Juice.MediatR.BehaviorsIdempotencyRequestBehavior<,>, TransactionBehavior<,,>
Juice.MediatR.ContractsIIdempotentRequest interface
Juice.EventBusIIntegrationEventHandler<T>, IntegrationEventDispatcher
Juice.EventBus.RabbitMQRabbitMQ transport (producer + consumer)
Juice.Messaging.Idempotency.RedisRedis idempotency backend (recommended)
Juice.Messaging.Idempotency.CachingIn-memory / distributed cache backend
Juice.Messaging.Idempotency.EFEF idempotency backend (auditable)

Sub-Builder Registration Order

The order of sub-builder calls inside AddMessaging() is significant. Register them in this exact sequence:

  1. AddPublishingPolicies() — must come before AddOutbox() because the outbox reads routing policies at message-staging time
  2. AddOutbox() — must come before AddDelivery() because the delivery builder reads outbox configuration during its construction
  3. AddIdempotency*() — must come before AddDelivery() because IntegrationEventDispatcher (created inside delivery) resolves IIdempotencyService at startup
  4. AddDelivery() — creates EventBusBuilder internally via its constructor; therefore services.AddEventBus() must not be called separately
  5. RabbitMQ — configured inside delivery.EventBus.AddRabbitMQ() because the EventBusBuilder is owned by DeliveryBuilder

Complete DI Registration

services.AddMediatR(cfg =>
{
    cfg.RegisterServicesFromAssembly(typeof(MyHandler).Assembly);
    cfg.AddIdempotencyRequestBehavior();
    cfg.AddOpenBehavior(typeof(AppTransactionBehavior<,>));  // subclassed TransactionBehavior
});

services.AddMessaging(builder =>
{
    // 1. Publishing policies (needed by outbox at staging time)
    builder.AddPublishingPolicies(configuration.GetSection("PublishingPolicies"));

    // 2. Outbox (must precede delivery)
    builder.AddOutbox(outbox =>
    {
        outbox.AddOutboxRepository();
        outbox.AddDeliveryIntents();
    });

    // 3. Idempotency backend (before EventBus dispatcher is created)
    builder.AddIdempotencyRedis(opts =>
    {
        opts.Configuration = configuration.GetConnectionString("Redis");
    });

    // 4. Delivery (creates EventBusBuilder internally — do not call AddEventBus() separately)
    builder.AddDelivery(delivery =>
    {
        delivery.AddDeliveryProcessor<AppDbContext>("rabbitmq",
            "send-pending", "retry-failed", "recover-timeout");
        delivery.AddDeliveryPolicies(configuration.GetSection("DeliveryPolicies"));
    });

    // 5. EventBus with infrastructure
    builder.AddEventBus()
        .AddConsumerServices(consumers =>
        {
            consumers.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
        })
        .AddRabbitMQ(rabbitMQ =>
        {
            // 6. RabbitMQ inside EventBus
            rabbitMQ.AddConnection("rabbitmq", configuration.GetSection("RabbitMQ"))
                    .AddProducer("rabbitmq", "rabbitmq")
                    .AddConsumer("orders", "orders-queue", "rabbitmq");
        });
});

services.AddHealthChecks()
    .AddRabbitMQHealthCheck("rabbitmq", tags: new[] { "ready" })
    .AddOutboxDeliveryHealthCheck<AppDbContext>(opts =>
    {
        opts.StuckMessageThresholdMinutes = 15;
        opts.MaxStuckMessages = 50;
    }, tags: new[] { "live" });

Migration from v8.5.0

v8.5.0 → v9 Concept Mapping

v8.5.0 ConceptPackage (v8.5.0)v9 ReplacementPackage (v9)
IEventBus interfaceJuice.EventBusIOutboxService<T> for staging; IEventBus (CompositeEventPublisher) remains as a publish façadeJuice.Messaging / Juice.EventBus
IEventBus.PublishAsync() direct callJuice.EventBusIOutboxService.AddEventAsync() + SaveEventsAsync() inside transactionJuice.Messaging.Outbox.EF
IntegrationEventLog table (single-table)Juice.EventBus.IntegrationEventLog.EFOutboxEvent + OutboxDelivery two-table modelJuice.Messaging.Outbox.EF
IIntegrationEventLogServiceJuice.EventBus.IntegrationEventLog.EFIOutboxService<T>Juice.Messaging
IIntegrationEventLogService.SaveEventAsync()IOutboxService.AddEventAsync() + SaveEventsAsync(transactionId)
IIntegrationEventLogService.MarkEventAsInProgressAsync()Handled automatically by DeliveryHostedService
IIntegrationEventLogService.MarkEventAsPublishedAsync()Handled automatically by DeliveryHostedService
IIntegrationEventLogService.MarkEventAsFailedAsync()Handled automatically by DeliveryHostedService
IIntegrationEventServiceJuice.IntegrationsIOutboxService<T>Juice.Messaging
IIntegrationEventService.AddAndSaveEventAsync()IOutboxService.AddEventAsync()
IIntegrationEventService.PublishEventsThroughEventBusAsync()Removed — DeliveryHostedService publishes automatically
TransactionBehavior (v8, in Juice.Integrations.MediatR)Juice.IntegrationsTransactionBehavior<T,R,TContext> (abstract, in Juice.MediatR.Behaviors)Juice.MediatR.Behaviors
services.RegisterRabbitMQEventBus()Juice.EventBus.RabbitMQbuilder.AddMessaging().AddDelivery(d => d.EventBus.AddRabbitMQ(...))Juice.Messaging + Juice.EventBus.RabbitMQ
IRequestManager (command deduplication)Juice.MediatRIIdempotentRequest + IdempotencyBehaviorJuice.MediatR.Contracts + Juice.MediatR.Behaviors
IIdentifiedCommand<T> wrapperJuice.MediatRRemoved — mark command directly with IIdempotentRequestJuice.MediatR.Contracts
requestManager.ExistAsync(id) in handlerRemoved — IdempotencyBehavior handles it automatically
IntegrationEvent record (v8)Juice.EventBusIntegrationEvent record still exists; now extends MessageBase (carries MessageId, CreatedAt, TenantId)Juice.EventBus.Contracts
IIntegrationEventHandler<T>.HandleAsync()Juice.EventBusUnchangedJuice.EventBus
RabbitMQ SubscriptionClientName configJuice.EventBus.RabbitMQAddConsumer(key, queueName, connectionName)Juice.EventBus.RabbitMQ

What Is Unchanged

These v8.5.0 concepts carry forward to v9 without replacement:

  • IntegrationEvent abstract record (still the base for integration events)
  • IIntegrationEventHandler<T> interface (handlers unchanged)
  • IIntegrationEvent interface (still required on the consuming side)
  • RabbitMQ.Client as the broker transport
  • Finbuckle.MultiTenant for tenant resolution
  • MediatR IRequest<T>, INotification patterns (MediatR itself unchanged)
  • IAggregrateRoot<TNotification> domain pattern
  • IUnitOfWork interface

See also