Messaging Yapılarında MassTransit ile Error ve Redeliver Handling

Merhaba arkadaşlar.

Bu makale konumda sizlere messaging yapılarına geçiş süreçlerimizde consumer’lar üzerinde karşılaştığımız Error ve Redeliver handling gibi concern’lerden bahsedip, nasıl bir yaklaşım uyguladığımız konusunda ve service bus olarak kullandığımız MassTransit kütüphanesinin nasıl çözümler sunduğu konusunda bilgiler paylaşmaya çalışacağım.

Consumer ile Error Handling

Bir e-commerce sistemi üzerinde çalıştığımızı düşünelim. Sistem üzerinde bir sipariş başarıyla gerçekleştiği an, sistem tarafından “IPaymentApproved” tipinde bir event fırlatıldığını varsayalım. “IPaymentApproved” event’ını dinleyen farklı consumer’lar olabileceği gibi, bunlardan birisinin örneğimiz gereği sipariş detaylarını müşteriye e-mail gönderen bir servis olduğunu düşünerek ele alalım.

“IPaymentApproved” event’ı, aşağıdaki gibi tanımlı olsun.

namespace ErrorAndRedeliverHandlingSample.Contracts
{
    public interface IPaymentApproved
    {
        string OrderNumber { get; set; }
    }
}

“IPaymentApproved” event’ını dinleyen consumer ise, şimdilik aşağıdaki gibi implemente edilmiş olsun.

namespace ErrorAndRedeliverHandlingSample
{
    public class PaymentApprovedConsumer : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            //send an e-mail logic...
        }
    }
}

Peki consume işlemi sırasında, herhangi bir exceptional durum ile karşılaşırsak ne olacak? Bu exception hatalı kodlamadan kaynaklı olabileceği gibi, eksik/tutarsız produce edilen bir message’dan kaynaklı da olabilir. Bu tarz durumlarda kullanılmak üzere MassTransit içerisinde implemente edilmiş bazı çözümler mevcuttur.

Default bus ile exception MassTransit middleware’i tarafından yakalanır ve ilgili message, “{queue_name}_error” isimli bir queue’ya otomatik olarak taşınır. Exception’ın detaylarına ise taşınan message’ın header’ı üzerinden erişebilmek mümkündür. Bu tarz problemlerden haberdar olabilmek için ise oluşan “{queue_name}_error” gibi önemli queue’ları, özellikle production ortamları için network monitoring tool’ları ile sensor ekleyerek takip etmek, doğru bir hareket olacaktır.

1. Retrying Messages

Bazı durumlarda bir web service’in o an cevap verememesi, database tarafında deadlock oluşabilmesi gibi transient exceptionlar gerçekleşebilmektedir. Bu tarz durumlarda ilgili message’ı o an kaybetmemek için kullanabileceğimiz bazı retrying function’ları bulunmaktadır.

Örneğin:

namespace ErrorAndRedeliverHandlingSample
{
    public class PaymentApprovedConsumer : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            //send an e-mail logic...
            throw new Exception("Something's happened during processing...");
        }
    }
}

Yukarıdaki kod bloğuna bakalım ve e-mail gönderim işlemi sırasında kullanıyor olduğumuz e-mail provider’ına ait web service’in, cevap vermediğini ve bu yüzden transient bir exception oluştuğunu düşünelim. Bu tarz durumlar genelde ikinci defa denemeler sonucunda başarıyla gerçekleşebilmektedir.

Retry işlemini devreye sokabilmek için aşağıdaki gibi consumer’ı initialize ederken, “UseRetry” method’unu kullanmak yeterli olacaktır.

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint(host, "queue_name", e =>
    {
        e.UseRetry(r => r.Immediate(5));
        e.Consumer(() => new PaymentApprovedConsumer());
    });
}

e.UseRetry(r => r.Immediate(5));” kısmına dikkat edersek, “Immediate” policy’si ile herhangi bir exception anında ilgili message’ı error queue‘ya taşımadan önce, “5” kere daha denemesi gerektiğini bildirdik. “Immediate” gibi birden fazla retry policy’leri bulunmaktadır. Bu policy’ler arasında benim en yaygın olarak kullandığım ise, “Incremental” retry policy olanıdır. Bu policy ile ne kadar retry işleminin tekrar yapılacağını, her denemenin ardından ne kadar süre bekleneceği ve her bekleme süresinin de incremental olarak ne kadar artması gerektiği gibi bilgileri de set edebilmekteyiz.

Bununla ilgili örneği ise, MassTransit’in sık kullanıyor olduğum function’larını wraplediğim MetroBus library’si üzerinden göstermek istiyorum.

IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword")
    .UseIncrementalRetryPolicy(retryLimit: 5, initialIntervalFromMinute: 10, intervalIncrementFromMinute: 10)
    .InitializeConsumer("queueName").Build();

Bunlara ek olarak MassTransit üzerinde bulunan diğer retry policy’ler ise:

  • None
  • Immediate
  • Intervals
  • Exponential
  • Incremental

şeklindedir.

Bir diğer yandan retrying işlemleri sırasında meydana gelen exception’ları, retry filter‘larını kullanarak handle ve ignore edebilmek de mümkündür.

Örneğin:

cfg.UseRetry(r => 
{
    c.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
});

Daha detaylı bilgilere ise buradan ulaşabilirsiniz. Dilerseniz retry işlemi için bir örnek gerçekleştirelim.

NOT: Makale boyunca consumer’a yönelik örnekleri gerçekleştirebilmek için, queue üzerine “IPaymentApproved” interface’ini kullanarak bir kaç message produce edeceğim.

Bunun için öncelikle “PaymentApprovedConsumer” class’ını, aşağıdaki gibi initialize edelim.

namespace ErrorAndRedeliverHandlingSample
{
    public class PaymentApprovedConsumerService
    {
        private readonly IBusControl _consumerBusControl;
        private readonly string _rabbitMqUri;
        private readonly string _rabbitMqUserName;
        private readonly string _rabbitMqPassword;
        private readonly string _queueName;

        public PaymentApprovedConsumerService()
        {
            _rabbitMqUri = ConfigurationManager.AppSettings["RabbitMqUri"];
            _rabbitMqUserName = ConfigurationManager.AppSettings["RabbitMqUserName"];
            _rabbitMqPassword = ConfigurationManager.AppSettings["RabbitMqPassword"];
            _queueName = ConfigurationManager.AppSettings["FooQueue"];

            _consumerBusControl =
                MetroBusInitializer.Instance.UseRabbitMq(_rabbitMqUri, _rabbitMqUserName, _rabbitMqPassword)
                .UseIncrementalRetryPolicy(3, 1, 1)
                .InitializeConsumer(_queueName).Build();
        }

        public void Start()
        {
            _consumerBusControl.Start();
        }

        public void Stop()
        {
            _consumerBusControl.Stop();
        }
    }
}

ve ardından “Program.cs” içerisini aşağıdaki gibi kodlayıp, start’a basalım.

namespace ErrorAndRedeliverHandlingSample
{
    class Program
    {
        static void Main(string[] args)
        {
            var paymentApprovedConsumerService = new PaymentApprovedConsumerService();

            paymentApprovedConsumerService.Start();
        }
    }
}

Console çıktısına baktığımızda, consume ederken bir exception throw oldu ve bu işlem sonucunda retry mekanizması devreye girdi.

Queue üzerine baktığımızda ise:

message’ın “unacked” olarak beklediğini görebiliriz.

Incremental retry policy ile “1” er dakika arayla toplam “3” kere retry işlemi gerçekleştirilecektir ve her bir denemenin ardından interval time’a “1” er dakika daha eklenerek, retry işlemi retry limit’e kadar devam edecektir. Retry limit’e ulaşıldığında hala failure durumu mevcut ise aşağıdaki gibi message, error queue’ya taşınacaktır.

2. Circuit Breaker Kullanımı

MassTransit içerisindeki exception handling unsurlarından önemli birisi de, Circuit Breaker pattern’ıdır. Kullanım amacı ise belirli bir zaman diliminde sistem hata vermeye başladığında ve failure threshold değerine ulaşıldığında, resource’ları bu failure state durumlarındaki overloaded’dan koruyabilmek içindir diyebilirim.

Bu pattern’ın genel akışı ise aşağıdaki gibidir:

Örneğin “IPaymentApproved” event’ını consume ederken, e-mail gönderim işlemi sırasında kullandığımız  e-mail provider’ına ait web service’in bir süre cevap vermediğini ve request’lerin 30 sn gibi bir süre sonunda timed out’a uğradığını düşünelim. Peki ya t anında bir çok event oluşursa ne olacak? Tüm request’ler 30 sn timeout süreleri sonunda failure state’ine uğrayacaklar ve resource’u gereksiz bir yere meşgul etmiş olacağız ve aynı zamanda cascading failure‘lara da sebebiyet vermiş olabileceğiz.

Peki ya circuit breaker’ı açarsak bu durumda ne olacak?

Bu durumda circuit breaker bizim için failure durumlarını monitor edecektir. Failures durumları belirli bir threshold seviyesine ulaştığında, circuit breaker devreye girecek ve repeated failures’lara sebebiyet veren durumları geçici olarak engelleyecektir. Çalışma mantığı da ise reset interval time expire olduğunda, consume işlemi yavaş bir şekilde çalışmaya devam edecektir. Eğer bu durum karşısında failure tekrar devam ederse, timeout interval reset’lenir ve circuit breaker tekrar açık moda getirilir. Eğer herhangi bir failure ile karşılaşılmazsa, tüm consume flow’u normal hale getirilir.

MassTransit üzerinde circuit breaker’ı, aşağıdaki gibi etkinleştirebilmek mümkündür.

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint(host, "queue_name", e =>
    {
        e.UseCircuitBreaker(cb =>
        {
            cb.TripThreshold = 15;
            cb.ActiveThreshold = 10;
            cb.ResetInterval = TimeSpan.FromMinutes(5);
        });
        e.Consumer(() => new PaymentApprovedConsumer());
    });
}

ayrıca MetroBus tarafında ise, aşağıdaki gibi etkinleştirilebilinir:

IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword")
    .UseCircuitBreaker(tripThreshold: 15, activeThreshold: 10, resetInterval: 5)
    .InitializeConsumer("queueName").Build();

Circuit breaker hakkında daha detaylı bilgiye, buradan ulaşabilirsiniz.

3. Rate Limiter Kullanımı

Consumer initialize ederken dikkat edilmesi gereken bir diğer unsur ise, rate limiter kullanımıdır. Kullanımındaki asıl amaç ise: consumer içerisinde e-mail gönderim işlemi için kullandığımız provider, saniye başına belirli bir sayıda call limiti kabul ediyor olabilir. Bu gibi durumlarda rate limiter kullanılabilmektedir.

Aşağıdaki gibi etkinleştirebilmek mümkündür.

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint(host, "queue_name", e =>
    {
        e.UseRateLimit(20, TimeSpan.FromSeconds(5));
        e.Consumer(() => new PaymentApprovedConsumer());
    });
}

MetroBus üzerinde ise:

IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword")
    .UseRateLimiter(rateLimit: 20, interval: 5000)
    .InitializeConsumer("queueName").Build();

“UseRateLimiter()” ı kullanarak fluently bir şekilde etkinleştirebilmek mümkündür.

Redeliver Messages

Son zamanlarda bazı business case’ler doğrultusunda karşılaştığım belkide en önemli concern’lerden birisi de, henüz işlemememiz gereken message’ları nasıl handle edeceğiz konusu olmuştur.

Build-in olarak RabbitMQ içerisinde AMQP standartlarında deferred işlemler bulunmamaktadır. Fakat MassTransit içerisinde bunu gerçekleştirebilmenin bir kaç yolu vardır.

  1. Quartz.Net’i kullanarak, scheduling message’lar ile gerçekleştirilebilinir. Bu konudaki bilgiye, buradan erişebilirsiniz.
  2. RabbitMQ’nun community plugin’leri arasında bulunan, “RabbitMQ Delayed Message Plugin” ile gerçekleştirebilmek de mümkündür.

Bu konudaki örneği benimde kullandığım “RabbitMQ Delayed Message Plugin” ile gerçekleştireceğim. Bunun için öncelikle buradan “rabbitmq_delayed_message_exchange” plugin’ini indirelim ve kurulumunu gerçekleştirelim.

NOT: İndirilen ilgili plugin’i RabbitMQ içerisindeki “plugins” klasörüne atarak, “rabbitmq-plugins enable rabbitmq_delayed_message_exchange” komutu ile CLI üzerinden etkinleştirebilmek mümkündür.

Plugin’i etkinleştirmemizin ardından, MassTransit içerisinden aşağıdaki gibi initialize edebiliriz.

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.UseDelayedExchangeMessageScheduler();
}

MetroBus içerisinde ise:

IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword")
    .UseDelayedExchangeMessageScheduler()
    .InitializeConsumer("queueName").Build();

yukarıdaki gibi initialize etmek, yeterli olacaktır.

“PaymentApprovedConsumer” class’ını ise, aşağıdaki gibi refactor edelim.

namespace ErrorAndRedeliverHandlingSample
{
    public class PaymentApprovedConsumer : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            int? maxAttempts = context.Headers.Get("MT-Redelivery-Count", default(int?));

            if (maxAttempts > 3)
            {
                throw new Exception("Something's happened during processing...");
            }

            Console.WriteLine($"Attempts: {maxAttempts} Order number: {context.Message.OrderNumber}");

            await context.Defer(TimeSpan.FromMinutes(1));
        }
    }
}

Deferred işlemlerde dikkat etmemiz gereken bir diğer nokta ise, “MT-Redelivery-Count” key’i ile message’ın header’ı üzerinden aldığımız redelivery count bilgisidir. Bu bilgi sayesinde message’ı kaç kere retry ettiğimizi öğrenebilir, infinite retry işlemlerinden de kaçınabilmiş oluruz.

Consumer’ı start edelim ve sonucuna bir bakalım.

Console çıktısına baktığımızda, “context.Defer(TimeSpan.FromMinutes(1))” kod satırı ile set etmiş olduğumuz “1” er dakikalık defer işlemi, başarıyla gerçekleşmiş ve sonunda işlemi tamamlayamadığı için ilgili message’ı error queue üzerine yönlendirmiştir.

Bu işlem sırasında RabbitMQ’nun management ekranına baktığımızda ise:

MassTransit burada bizim için “_delay” postfix’i ile bir exchange oluşturmuştur ve defer işlemi sırasında ilgili message’ın bind işlemini, bu exchange üzerinden gerçekleştirmiştir.

Umarım faydalı bir yazı olmuştur. Makale sırasında kullandığım ilgili örneğe, GitHub hesabım üzerinden erişebilirsiniz. Eğer konu ile ilgili herhangi bir sorunuz olursa, yorum olarak yazabilirsiniz.

https://github.com/GokGokalp/messaging-error-and-redeliver-handling-sample

Gökhan Gökalp

View Comments

  • Hocam selam, önceki tema çok daha sade okunaklı ve güzeldi. Bu hiç olmamış, hiç elit değil. Bildiğin "PHP'ci" teması olmuş bu, lütfen eski haline al :D

    Saygılarımla, çok seven takipçin.

  • Gökhan Hocam teşekkür ederiz yazılar için. birçok teknolojiyi bir arada derli toplu sunmanız very nice olmuş :)

  • Thank you for such interesting article, however it would be great if you can extend it with some ideas of custom exception handling outside masstransit, for instance if we have two storage services and one of them succeeds while the other fails to store data, then I might need to delete the record stored by the first service.

    What do you think of as a good pattern to solve this issue? I am thinking to create a new different service to sort this kind of exceptions, so I encapsulated the exceptions inside "ValidatedMessage" containing an IsValid bool and list of exceptions because if I throw an exception then Masstransit would try to resend the message always when it should not for some cases.
    I am having some troubles on how to forward the ValidatedMessage to my exception handling service and stop consuming it in the current service if IsValid is false.

    • Thanks for your comment. As you said, we have two storage and we are trying to add something. First, how do we try to add something to these services? I mean, is this add operation happening in a single point or separate services? Anyway, this situation is really ineluctable when we are working with distributed systems. :) So, I don't really know what the best performant way is. I think we can apply many ways to provide this situation in "eventually consistency" such as "sagas, state machines", "compensate transaction" etc... Well, the masstransit is also great framework when we are working with the distributed systems. It provides great functionalities like "SagaStateMachine". I want to say again is "I don't know, what the best performant way is". These patterns really have some complexities. If you want, you can also check this link: https://gokhan-gokalp.azurewebsites.net/en/messaging-architecture-da-saga-patterni-ile-failure-management/ Btw, maybe we can avoid this situation in a quick way, with developing some end-of-day or operation checker such as watchdog.

Recent Posts

Containerized Uygulamaların Supply Chain’ini Güvence Altına Alarak Güvenlik Risklerini Azaltma (Güvenlik Taraması, SBOM’lar, Artifact’lerin İmzalanması ve Doğrulanması) – Bölüm 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

8 ay ago

Identity & Access Management İşlemlerini Azure AD B2C ile .NET Ortamında Gerçekleştirmek

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 yıl ago

Azure Service Bus Kullanarak Microservice’lerde Event’ler Nasıl Sıralanır (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 yıl ago

.NET Microservice’lerinde Outbox Pattern’ı ile Eventual Consistency için Atomicity Sağlama

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 yıl ago

Dapr ve .NET Kullanarak Minimum Efor ile Microservice’ler Geliştirmek – 02 (Azure Container Apps)

{:tr}Bir önceki makale serisinde Dapr projesinden ve faydalarından bahsedip, local ortamda self-hosted mode olarak .NET…

2 yıl ago