Site icon Muzaffer AKYIL (Victorious)

Mediator.Net ile .NET’te CQRS ve Pipeline Tabanlı Mesajlaşma

Mediator.Net

Mediator.Net

Giriş

Controller’ların onlarca servisi enjekte ettiği, iş kurallarının doğrudan handler çağrılarıyla dağıldığı bir codebase’de değişiklik yapmak hem testi zorlaştırır hem de mimariyi kırar. Mediator pattern, isteğin tek bir giriş noktasından geçmesini ve ilgili handler’a taşınmasını sağlayarak bu bağımlılığı azaltır. .NET ekosisteminde bu ihtiyacı karşılayan kütüphanelerden biri de Mediator.Net: komut (command), sorgu (request/response) ve olay (event) yayımını ayrı pipeline’lar üzerinden yönetir ve CQRS/clean architecture hedefleriyle uyumludur.

Bu yazı Mediator.Net’in ne olduğunu, hangi problemi çözdüğünü, MediatR ile farklarını ve uygulama içinde nasıl kullanılacağını adım adım ele alıyor.

Mediator.Net Nedir?

Mediator.Net, .NET için yazılmış, mediator pattern’i uygulayan ve CQRS (Command Query Responsibility Segregation) ile uyumlu bir mesajlaşma kütüphanesidir. İstekleri doğrudan handler sınıflarına göndermek yerine, tek bir IMediator aracı üzerinden gönderir; kütüphane mesaj tipine göre ilgili handler’ı bulur ve pipeline (boru hattı) üzerinden çalıştırır. Böylece uygulama katmanları birbirini tanımak zorunda kalmaz; clean architecture ve hexagonal mimaride kullanılabilir.

Üç temel mesaj tipi vardır:

Ayrıca IAsyncEnumerable ile streaming yanıt desteği sunar.

Hangi Problemi Çözer?

Büyüyen uygulamalarda controller veya use-case sınıfları, veritabanı, e-posta, loglama, cache gibi birçok servise doğrudan bağımlı hale gelir. Bu da birim testleri zorlaştırır, değişiklik maliyetini artırır ve tek sorumluluk ilkesini ihlal eder. Mediator.Net, tüm istekleri tek bir IMediator üzerinden geçirerek çağrıyı yapan tarafın hangi handler’ın çalışacağını bilmesini engeller; handler’lar sadece kendi mesaj tiplerini işler. Böylece fat controller ve dağınık iş mantığı yerine, her biri tek bir mesaj tipinden sorumlu handler’lar kullanılır.

MediatR ve Alternatiflerden Farkı

.NET dünyasında en bilinen mediator kütüphanesi MediatR‘dir. Mediator.Net de aynı amaca hizmet eder ama bazı tasarım farkları taşır.

Her iki kütüphane de CQRS ve mediator pattern uygular; seçim pipeline granüllüğü, context ihtiyacı ve lisans kriterine göre yapılabilir.

Dikkat
MediatR ile Mediator.Net aynı projede birlikte kullanılmaz; birini seçin ve tüm kod tabanında tutarlı kalın.

Hangi Senaryolarda Kullanılmalıdır?

Mediator.Net aşağıdaki durumlarda anlamlıdır:

Hangi Durumlarda Kullanılmamalıdır?

Aşağıdaki koşullarda ek bir abstraction olarak mediator eklemek genelde gereksiz karmaşıklık getirir:

Ufak bir hatırlatma
Mediator eklemeden önce mevcut bağımlılık ağını inceleyin; bazen sadece servisleri bölmek ve interface’leri daraltmak yeterli olur.

Bu durumlarda doğrudan servis enjeksiyonu veya minimal API ile işlem yapmak daha sade kalır.

Basit Command Kullanımı

Komut, yanıt beklemeden çalıştırılan bir mesajdır. Önce ICommand ve ilgili ICommandHandler tanımlanır; ardından SendAsync ile gönderilir.

// Komut mesajı
public class CreateOrderCommand : ICommand
{
    public string CustomerId { get; set; }
    public decimal Amount { get; set; }
}

// Handler: IReceiveContext üzerinden mesaja erişilir
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
    public async Task Handle(IReceiveContext<CreateOrderCommand> context, CancellationToken cancellationToken)
    {
        var cmd = context.Message;
        var order = new Order(cmd.CustomerId, cmd.Amount);
        // Persistence, domain event yayımı vb.
        await context.Publish(new OrderCreatedEvent { OrderId = order.Id });
    }
}

// Kullanım
await _mediator.SendAsync(new CreateOrderCommand { CustomerId = "C001", Amount = 99.99m });

Query (Request/Response) Kullanımı

Sorgu tarafında IRequest ve IResponse kullanılır; handler yanıtı context üzerinden veya dönüş değeri ile verir. Çağrı RequestAsync ile yapılır.

public class GetOrderRequest : IRequest
{
    public Guid OrderId { get; set; }
}

public class OrderDto : IResponse
{
    public Guid Id { get; set; }
    public string CustomerId { get; set; }
    public decimal Amount { get; set; }
}

public class GetOrderRequestHandler : IRequestHandler<GetOrderRequest, OrderDto>
{
    public async Task Handle(IReceiveContext<GetOrderRequest> context, CancellationToken cancellationToken)
    {
        var request = context.Message;
        var order = await _orderRepo.GetByIdAsync(request.OrderId, cancellationToken);
        return new OrderDto { Id = order.Id, CustomerId = order.CustomerId, Amount = order.Amount };
    }
}

var order = await _mediator.RequestAsync<GetOrderRequest, OrderDto>(new GetOrderRequest { OrderId = id });

Async Handler ve CancellationToken

Tüm handler imzaları async Task ve CancellationToken alır; uzun süren işlemlerde iptal desteği için kullanılır.

public class ExportOrdersCommandHandler : ICommandHandler<ExportOrdersCommand>
{
    public async Task Handle(IReceiveContext<ExportOrdersCommand> context, CancellationToken cancellationToken)
    {
        var cmd = context.Message;
        await foreach (var order in _orderRepo.StreamAsync(cmd.From, cmd.To, cancellationToken))
        {
            cancellationToken.ThrowIfCancellationRequested();
            await _exporter.WriteAsync(order, cancellationToken);
        }
    }
}

Pipeline Behavior (Middleware) Yazımı

Cross-cutting mantığı pipeline’a taşımak için IPipeSpecification<TContext> uygulanır. BeforeExecute, Execute, AfterExecute ve OnException aşamasında çalışır.

public class LoggingPipeSpecification : IPipeSpecification<IReceiveContext<IMessage>>
{
    private readonly ILogger _logger;

    public LoggingPipeSpecification(ILogger logger) => _logger = logger;

    public bool ShouldExecute(IReceiveContext<IMessage> context, CancellationToken cancellationToken) => true;

    public Task BeforeExecute(IReceiveContext<IMessage> context, CancellationToken cancellationToken)
    {
        _logger.LogInformation("İşleniyor: {MessageType}", context.Message.GetType().Name);
        return Task.CompletedTask;
    }

    public Task Execute(IReceiveContext<IMessage> context, CancellationToken cancellationToken) => Task.CompletedTask;

    public Task AfterExecute(IReceiveContext<IMessage> context, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Tamamlandı: {MessageType}", context.Message.GetType().Name);
        return Task.CompletedTask;
    }

    public Task OnException(Exception ex, IReceiveContext<IMessage> context)
    {
        _logger.LogError(ex, "Hata: {MessageType}", context.Message.GetType().Name);
        return Task.CompletedTask;
    }
}

Validation Entegrasyonu

Validasyonu pipeline’a ekleyerek tüm komut veya sorgularda merkezi kontrol sağlanır. Örnek: FluentValidation benzeri bir validator pipeline’da çağrılır.

public class ValidationPipeSpecification : IPipeSpecification<IReceiveContext<IMessage>>
{
    private readonly IValidatorFactory _validatorFactory;

    public ValidationPipeSpecification(IValidatorFactory validatorFactory) => _validatorFactory = validatorFactory;

    public bool ShouldExecute(IReceiveContext<IMessage> context, CancellationToken ct) => true;

    public async Task BeforeExecute(IReceiveContext<IMessage> context, CancellationToken cancellationToken)
    {
        var validator = _validatorFactory.GetValidator(context.Message.GetType());
        if (validator != null)
        {
            var result = await validator.ValidateAsync(context.Message, cancellationToken);
            if (!result.IsValid)
                throw new ValidationException(result.Errors);
        }
    }

    public Task Execute(IReceiveContext<IMessage> context, CancellationToken ct) => Task.CompletedTask;
    public Task AfterExecute(IReceiveContext<IMessage> context, CancellationToken ct) => Task.CompletedTask;
    public Task OnException(Exception ex, IReceiveContext<IMessage> context) => Task.CompletedTask;
}

Logging Pipeline ve ConfigureGlobalReceivePipe

Tüm mesajlar için geçerli loglama, global receive pipeline’a eklenir. Resmi Mediator.Net.Middlewares.Serilog paketi de kullanılabilir.

var mediator = new MediatorBuilder()
    .RegisterHandlers(typeof(Program).Assembly)
    .ConfigureGlobalReceivePipe(cfg => cfg.UseSerilog(LogEventLevel.Information))
    .Build();

// Özel logging middleware extension örneği
public static void UseRequestLogging(this IPipeConfigurator<IReceiveContext<IMessage>> configurator, ILogger logger)
{
    configurator.AddPipeSpecification(new LoggingPipeSpecification(logger));
}

Dependency Injection Entegrasyonu

Microsoft.Extensions.DependencyInjection ile AddMediator ve handler kaydı aşağıdaki gibi yapılır.

// Paket: Mediator.Net.MicrosoftDependencyInjection
services.AddMediator(builder =>
{
    builder.RegisterHandlers(typeof(Program).Assembly);
    builder.ConfigureGlobalReceivePipe(cfg => cfg.UseRequestLogging(logger));
});

Handler’lar DI container’da otomatik çözülür; pipeline specification’lar da gerekirse scope’tan alınır.

ASP.NET Core ile Kullanım

Controller’da IMediator enjekte edilir; komut ve sorgu çağrıları SendAsync / RequestAsync ile yapılır.

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IMediator _mediator;

    public OrdersController(IMediator mediator) => _mediator = mediator;

    [HttpPost]
    public async Task<IActionResult> Create([FromBody] CreateOrderRequest request)
    {
        await _mediator.SendAsync(new CreateOrderCommand { CustomerId = request.CustomerId, Amount = request.Amount });
        return Accepted();
    }

    [HttpGet("{id}")]
    public async Task<ActionResult<OrderDto>> Get(Guid id)
    {
        var result = await _mediator.RequestAsync<GetOrderRequest, OrderDto>(new GetOrderRequest { OrderId = id });
        return Ok(result);
    }
}

Background Processing Senaryosu

Hosted Service veya Worker Service içinde IMediator kullanarak arka planda komut/event işleyebilirsiniz.

public class OrderSyncBackgroundService : BackgroundService
{
    private readonly IMediator _mediator;
    private readonly ILogger<OrderSyncBackgroundService> _logger;

    public OrderSyncBackgroundService(IMediator mediator, ILogger<OrderSyncBackgroundService> logger)
    {
        _mediator = mediator;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await _mediator.SendAsync(new SyncPendingOrdersCommand(), stoppingToken);
            await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
        }
    }
}

Custom Middleware ve Pipeline’a Ekleme

Özel bir middleware yazıp ilgili pipeline’a (örneğin sadece komutlar için) eklemek için extension metodu ve specification kullanılır.

public static class AuditMiddlewareExtensions
{
    public static void UseAudit(this ICommandReceivePipeConfigurator configurator, IAuditStore store = null)
    {
        store ??= configurator.DependencyScope?.Resolve<IAuditStore>();
        configurator.AddPipeSpecification(new AuditPipeSpecification(store));
    }
}

// Konfigurasyon
var mediator = new MediatorBuilder()
    .RegisterHandlers(typeof(Program).Assembly)
    .ConfigureCommandReceivePipe(cfg => cfg.UseAudit())
    .ConfigureGlobalReceivePipe(cfg => cfg.UseRequestLogging(logger))
    .Build();

İç Çalışma Mantığı

Her SendAsync, RequestAsync veya PublishAsync çağrısında önce mesaj tipine göre bir ReceiveContext oluşturulur. Mediator, bu context’e kendisini ve gerekli pipe örneklerini kaydeder; böylece handler içinden context.Publish ile yeni event yayılabilir. Akış her zaman GlobalReceivePipeline ile başlar; bu pipeline tüm mesajlar için geçerlidir ve burada loglama, metrik gibi genel middleware’ler çalışır. Ardından mesaj tipi belirlenir:

Handler binding, MediatorBuilder ile assembly taraması (RegisterHandlers) veya explicit MessageBinding listesi ile yapılır. Event yayımında handler içinden context.Publish(evt) çağrılırsa PublishPipeline devreye girer. Bir komut veya sorgu için birden fazla handler kaydedilmemelidir; aksi halde kütüphane MoreThanOneHandlerException fırlatır. Handler bulunamazsa NoHandlerFoundException alınır.

Performans ve Tasarım Tercihleri

Assembly scanning çok sayıda handler için pratiktir; explicit registration ise startup süresini ve yanlış eşleşmeyi azaltır. Büyük projelerde handler’ların bulunduğu assembly’leri sınırlı tutarak tarama maliyetini düşürebilirsiniz. Pipeline sırası önemlidir: önce validasyon, sonra loglama, en sonda handler çalışması mantıklı bir sıralama olur. Streaming (CreateStream) çok sayıda kayıt dönülecek senaryolarda bellek dostu bir seçenek sunar. DI scope’u pipeline ve handler boyunca paylaşılabilir; IDependencyScope üzerinden çözümleme yapılır. Resmi Serilog, Unit of Work ve EventStore middleware paketleri ek kurulum ile kullanılabilir.

Sonuç ve İleri Okuma

Mediator.Net, CQRS ve mediator pattern’i .NET’te pipeline odaklı ve context destekli bir yapıyla uygular. Komut, sorgu ve olay ayırımı, beş farklı pipeline tipi ve resmi Serilog/Unit of Work/EventStore middleware paketleri ile orta ve büyük ölçekli uygulamalarda temiz bir mimari sunar. MediatR’dan farklı olarak pipeline granüllüğü ve Apache 2.0 lisansı ile kurumsal projelerde alternatif olarak değerlendirilebilir.

Kaynak ve İleri Okuma

Exit mobile version