Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente etme zorunda kaldığımız. Distributed ortamlarda birbirlerine bağlı bazı event’leri/message’ları belirli bir sıra (publish edildikleri) içerisinde işleyebilmek de bence bunlardan birisi. Örneğin batch processing yapıyor ve her bir item’ı belirli bir sıra içerisinde işlememiz gerekiyor olabilir veya bir e-ticaret firmasının fulfillment domain’inde çalışıyor olabilir ve siparişlerin statüslerini doğru güncelleyebilmek için ilgili event’leri gerçekleşen sıralarıyla işlememiz gerekiyor olabilir.
Bir çok message broker’ın FIFO prensibi ile çalıştığını ve normal şartlarda FIFO ordering’i sağladıklarını düşünürsek, bir publisher ve bir subscriber’a sahip olarak birbirine bağlı bazı event’leri publish edildikleri sıra içerisinde kolaylıkla consume edebiliriz. Tabi event işleme sırasında oluşabilecek network veya uygulama kaynaklı hataları saymazsak. Örneğin geçici bir hata yüzünden ilgili event kolaylıkla re-queue olabilir ve ilgili sırayı bozabilir veya message broker işlenmiş bir event için yine geçici bir hatadan dolayı ack bilgisini alamaz ve ilgili event’i tekrar işlenmeye uygun hale getirebilir ve duplication’lar oluşturabilir. Alternatif çözüm olarak event’ler içerisine bir timestamp dahil etmeyi de düşünebilir ve gerekli kontrolleri gerçekleştirdikten sonra işleme işlemini gerçekleştirebiliriz. Maalesef bu sefer de race-condition gibi farklı problemler ile başa çıkmamız gerekecektir. Teknik olarak elbette oluşabilecek bu tarz hataları da ele alabilir ve event işleme sırasını koruyabilmek adına farklı logic’ler kodlayabiliriz. Fakat gördüğümüz gibi event ordering standart FIFO queue’lar ile kolay erişilebilir gibi görünmesine rağmen, subscriber tarafında herhangi bir garanti olmadığı için işler karmaşıklaşmaya başlıyor.
Daha da ötesi tek bir subscriber gerekliliği yüzünden scalability ve decoupling gibi önemli prensipleri de kaybediyor ve genel throughput’dan fedakarlık ediyor olacağız.
Dolayısıyla sistemimizi, architecture’ımızı veya event payload’larımıza design ederken bu tarz operasyonlara gerek olmayacak şekilde design etmemiz veya en azından minimuma indirgememiz, ileriye yönelik sistemimizin sağlığı açısından oldukça önem taşımaktadır.
Peki, event ordering’e ihtiyacımız olduğunda ve implemente etme durumunda kaldığımızda ne yapacağız? Bu makale kapsamında ise en verimli bi şekilde bu işlemi Azure Service Bus kullanarak nasıl gerçekleştirebileceğimize değineceğim. Dolaysıyla message broker olarak Azure Service Bus kullandığımızı varsayacağım ve bu konuda bizlere Azure Service Bus‘ın sunmuş olduğu “sessions” özelliğinin nasıl yardımcı olabileceğine bir bakacağız.
Azure Service Bus – Sessions
Normal şartlarda FIFO event delivery garantisi sağlayan bir message broker kullanıyorsak, bir publisher ve bir subscriber’a sahip olarak birbirine bağlı bazı event’leri publish edildikleri sıra içerisinde consume edebileceğimizi söyledik. Fakat subscriber tarafında herhangi bir garanti olmadığı için bu yaklaşımın ne kadar kırılgan ve karmaşık olabileceğinden de bahsettik. Peki bu FIFO event delivery garantisini en kolay yoldan subscriber tarafı için de nasıl sağlayabiliriz?
İşte tam da bu noktada Azure Service Bus‘ın sessions özelliği devreye girmekte. Sessions’ları kullanarak istediğimiz event’leri belirleyeceğimiz bir “SessionId” ile etiketleyerek, ilgili event’lerin bir grup halinde ve publish edildikleri sıra içerisinde işlenebilmelerini sağlayabiliriz. Ayrıca bu işlemi scalability prensibinden de kısmen fazla ödün vermeden gerçekleştirebilmekteyiz.
Peki nasıl?
Öncelikle bu özelliği kullanabilmek için, Azure Service Bus‘ın “Standard” veya “Premium” seçeneklerinden birisini kullanıyor olmamız gerekmektedir. Azure Service Bus üzerinde bir “queue” veya “subscription” oluştururken, message sessions özelliğini de aktifleştirmemiz gerekmektedir. Aktifleştirme işlemi için buraya göz atabilirsiniz.
NOT: Sessions’ın etkinleştirildiği queue’larda tüm event’ler bir session id’si içermesi gerekmektedir.
Sessions’ın çalışma mantığını kısaca özetlemek gerekirse, event’ler publish edilirken bir session id’si ile publish edilmektedir. Örneğin bir sipariş id’si. Daha sonra herhangi bir subscriber ilgili queue’dan henüz üzerinde lock bulunmayan bir event’i alır ve işleyebilmek için öncelikle o event’in session id’si için bir lock işlemi gerçekleştirir. Böylelikle queue’daki aynı session id’sine sahip tüm event’ler, lock’ı tutan subscriber tarafından tek tek publish edildikleri sıra içerisinde işlenebilmektedir. Ayrıca bu noktada tek bir subscriber’a sahip olma zorunluluğumuz da bulunmamaktadır. Elbette bir session altındaki event’leri işlerken paralellikten fedakarlık etmiş olmaktayız, fakat farklı session’lar altındaki aynı event’leri paralel olarak işleyebilmekteyiz.
Bir Örnek Gerçekleştirelim
Konuyu daha iyi anlayabilmek adına basit bir örnek gerçekleştirelim. Bir e-ticaret firmasında çalıştığımızı varsayalım ve bir siparişin müşteriye teslim edilene kadar olan statüs değişikliklerini müşteriye göstermek istiyoruz.
Öncelikle Contracts adında bir .NET 7 class library projesi oluşturalım ve içerisine aşağıdaki gibi “OrderStatusChangedEvent” adında bir event tanımlayalım.
namespace Contracts; public class OrderStatusChangedEvent { public OrderStatusChangedEvent(string orderId, string status, DateTime changeDate) { OrderId = orderId; Status = status; ChangeDate = changeDate; } public string OrderId { get; set; } public string Status { get; set; } public DateTime ChangeDate { get; set; } }
Bu event ile ilgili sipariş özelindeki statüs değişikliklerini publish ettiğimizi ve aşağıdaki gibi de statüs’lere sahip olduğumuzu varsayalım.
- InPreparation
- Shipped
- DeliveryAttemptFailed
- DeliveredToPickupPoint
- Completed
Event’leri Publish Edelim
Şimdi ilgili event’leri publish edeceğimiz basit bir publisher oluşturalım. Bunun için Publisher adında bir .NET 7 console application’ı oluşturalım ve içerisine NuGet üzerinden “Azure.Messaging.ServiceBus” paketini dahil edelim. Ardından Contracts projesini de referans olarak ekleyelim.
Şimdi “Program.cs” class’ını aşağıdaki gibi düzenleyelim.
using Azure.Messaging.ServiceBus; using Contracts; var connectionString = "Endpoint=sb://YOUR_SERVICE_BUS.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHAREDACCESSKEY"; var queueName = "order.status.queue"; await using var serviceBusClient = new ServiceBusClient(connectionString); ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(queueName); var orderId = Guid.NewGuid().ToString(); // ------------------- Order status: In Preparation ------------------- var orderStatusChangedToInPreparationEvent = new OrderStatusChangedEvent( orderId: orderId, status: "InPreparation", changeDate: DateTime.UtcNow); var orderStatusChangedToInPreparationMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToInPreparationEvent)) { SessionId = orderId }; await serviceBusSender.SendMessageAsync(orderStatusChangedToInPreparationMessage); // ------------------- Order status: Shipped ------------------- var orderStatusChangedToShippedEvent = new OrderStatusChangedEvent( orderId: orderId, status: "Shipped", changeDate: DateTime.UtcNow); var orderStatusChangedToShippedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToShippedEvent)) { SessionId = orderId }; await serviceBusSender.SendMessageAsync(orderStatusChangedToShippedMessage); // ------------------- Order status: Delivery Attempt Failed ------------------- var orderStatusChangedToDeliveryAttemptFailedEvent = new OrderStatusChangedEvent( orderId: orderId, status: "DeliveryAttemptFailed", changeDate: DateTime.UtcNow); var orderStatusChangedToDeliveryAttemptFailedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToDeliveryAttemptFailedEvent)) { SessionId = orderId }; await serviceBusSender.SendMessageAsync(orderStatusChangedToDeliveryAttemptFailedMessage); // ------------------- Order status: Delivered to Pickup Point ------------------- var orderStatusChangedToDeliveredToPickupPointEvent = new OrderStatusChangedEvent( orderId: orderId, status: "DeliveredToPickupPoint", changeDate: DateTime.UtcNow); var orderStatusChangedToDeliveredToPickupPointMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToDeliveredToPickupPointEvent)) { SessionId = orderId }; await serviceBusSender.SendMessageAsync(orderStatusChangedToDeliveredToPickupPointMessage); // ------------------- Order status: Completed ------------------- var orderStatusChangedToCompletedEvent = new OrderStatusChangedEvent( orderId: orderId, status: "Completed", changeDate: DateTime.UtcNow); var orderStatusChangedToCompletedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToCompletedEvent)) { SessionId = orderId, }; orderStatusChangedToCompletedMessage.ApplicationProperties.Add("IsLastItem", true); await serviceBusSender.SendMessageAsync(orderStatusChangedToCompletedMessage);
Burada basit olarak bize gelen bilgiler doğrultusunda ilgil sipariş’e ait statüs değişikliklerini sırasıyla publish ettiğimizi varsayıyoruz.
Teknik olarak konuşursak “ServiceBusClient” ve ardından “ServiceBusSender” objelerini ilgili queue bilgisini belirterek initialize ediyoruz. Ardından event’leri publish ettiğimiz sırada subscriber tarafında işleyebilmek için ise, onları bir session id altında topluyoruz. Session id’si olarak ise burada, ilgili sipariş’in id bilgisini kullandık.
Burada önemli nokta ise, ilgili session’ın ne zaman biteceği bilgisini kendi kontrolümüz altında sağlamak. Bu işlemi ise en son publish edecek olduğumuz event’e ekleyeceğimiz bir property ile sağlayabilmekteyiz. Çünkü bir subscriber bir session’ı kabul ettikten ve ilgili session altındaki event’leri işlemeyi tamamladıktan sonra tekrardan farklı session’lar kabul edebilmesi için, kabul etmiş olduğu session’ı öncelikle release etmesi gerekmektedir. Gerçekleştirmiş olduğumuz örnekte ise sonuncu event’e eklemiş olduğumuz “IsLastItem” property’si ile session’ın release edilme işlemini subscriber tarafında gerçekleştireceğiz.
Elbette bir subscriber paralel olarak birden çok session’ı işleyebilmektedir fakat release etmediğimiz session’lar, subscriber’ın paralel olarak maksimum işleyebilecek olduğu session limitini olumsuz olarak etkileyecektir.
Şimdi İşleyelim
Şimdi subscriber tarafına bir göz atalım. Bunun için de Subscriber adında bir .NET 7 console application’ı oluşturalım ve yine NuGet üzerinden “Azure.Messaging.ServiceBus” paketini dahil edelim. Ardından Contracts projesini yine referans olarak ekleyelim.
Ardından “Program.cs” class’ını aşağıdaki gibi düzenleyelim.
using Azure.Messaging.ServiceBus; using Contracts; var connectionString = "Endpoint=sb://YOUR_SERVICE_BUS.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHAREDACCESSKEY"; var queueName = "order.status.queue"; await using var serviceBusClient = new ServiceBusClient(connectionString); await using ServiceBusSessionProcessor sessionProcessor = serviceBusClient.CreateSessionProcessor(queueName); sessionProcessor.ProcessMessageAsync += ProcessMessages; sessionProcessor.ProcessErrorAsync += ProcessErrors; Task ProcessMessages (ProcessSessionMessageEventArgs args) { var orderStatusChangedEvent = args.Message.Body.ToObjectFromJson<OrderStatusChangedEvent>(); Console.WriteLine($"Session ID: {args.SessionId}\nOrder ID: {orderStatusChangedEvent?.OrderId}\nStatus: {orderStatusChangedEvent?.Status}\nChanged Date: {orderStatusChangedEvent?.ChangeDate}"); Console.WriteLine("------------------------------"); var appProperties = args.Message.ApplicationProperties; if (appProperties != null && appProperties.TryGetValue("IsLastItem", out var isLastItem)) { if ((bool)isLastItem) { Console.WriteLine("Session closed"); args.ReleaseSession(); } } return Task.CompletedTask; } Task ProcessErrors(ProcessErrorEventArgs args) { Console.WriteLine("There is an error!"); return Task.CompletedTask; } await sessionProcessor.StartProcessingAsync(); Console.ReadKey();
Burada öncelikle “ServiceBusSender” yerine “ServiceBusSessionProcessor” objesini initialise ediyoruz. Ardından event’leri işleyebilmek için “ProcessMessageAsync” ve “ProcessErrorAsync” event’lerini tanımlıyoruz.
Subscriber bu noktada “order.status.queue” içerisinde bulunan ilk event’i kabul ederek, ilgili session id’si için bir lock tutacaktır. İlgili session boyunca ise session içerisinde bulunan event’leri publish edildikleri sıra doğrultusunda alarak, işleme işlemini gerçekleştirecektir. Subscriber’ın ilgili session özelinde tuttuğu lock sayesinde de ilgili session altındaki event’ler, diğer subscriber’lar tarafından alınamayacak ve event’lerin sıraları korunarak birer birer işlenmesi sağlanacaktır.
Default lock modu ise “PeekLock” dır. Böylece ilgili event, session boyunca başarıyla işlenmeden queue’dan silinmeyecektir ve ayrıca default olarak ack bilgisi otomatik şekilde Azure Service Bus‘a gönderilecektir. Eğer bu işlemi manuel olarak kendi kontrolümüz altında gerçekleştirmek istiyorsak da, “ServiceBusSessionProcessor” objesini initialize ederken aşağıdaki gibi “AutoCompleteMessages” property’sini “false” set etmemiz yeterli olacaktır.
var options = new ServiceBusSessionProcessorOptions() { AutoCompleteMessages = false }; await using ServiceBusSessionProcessor sessionProcessor = serviceBusClient.CreateSessionProcessor(queueName, options);
Ardından ilgili event’i işledikten sonra aşağıdaki gibi event’i işlediğimiz bilgisini kendimiz broker’a bildirebiliriz.
await args.CompleteMessageAsync();
“ReceiveAndDelete” lock modun da ise ilgili event, ilgili subscriber tarafından kabul edildikten sonra hemen ilgili queue’dan silinmektedir. Herhangi bir hata anında ilgili event’i kaybetme riski olduğu için açıkcası riskli bir yaklaşım.
Ek olarak lock sürelerini ise ilgili queue’ları oluştururken de belirleyebilmekteyiz. İşleyeceğimiz event’lerin işleme süreleri genellikle uzun olacaksa, en azından kendimizi garantiye alabilmek adına ilgili lock sürelerini uzatmamız ve yenilememiz faydamıza olacaktır. Yenileme işlemini ise “args.RenewSessionLockAsync();” method’unu çağırarak gerçekleştirebiliriz. Aksi takdirde ilgili lock release edilerek farklı subscriber’lar için de alınabilir hale getirilecektir.
Ayrıca bu session işlemleri broker seviyesinde gerçekleştiği için ilgili subscriber herhangi bir hatadan dolayı çalışamaz bir hale geldiğinde de, uygun subscriber’lardan bir tanesi ilgili session’ı kabul edebilir ve event’leri aynı sıraları korunarak kaldığı yerden işlemeye devam edebilmektedir.
Aynı şekilde bir event’i işleme sırasında herhangi bir geçici hata oluştuğunda da belirlenen retry policy’sine göre ilgili event, sırası korunarak tekrardan işlenmeye çalışılmaktadır. Bu noktada dikkat etmemiz gereken nokta ise, ilgili event maksimum retry sayısına ulaştığında ise dead letter queue’ya taşınmaktadır. Bu retry işleminin ise ne kadar gerçekleşeceğini ister ilgili queue’yu oluştururken, istersek de “ServiceBusClient” ‘ı initialise ederken aşağıdaki gibi belirleyebiliriz.
var clientOptions = new ServiceBusClientOptions { RetryOptions = new ServiceBusRetryOptions() { MaxRetries = 10, MaxDelay = TimeSpan.FromMinutes(1) } }; await using var serviceBusClient = new ServiceBusClient(connectionString, clientOptions);
“ProcessMessages” method’u içerisinde ise ilgili session’ı release edebilmek için, aşağıdaki gibi event property’lerine erişiyoruz ve belirlemiş olduğumuz “IsLastItem” property’sini kontrol ederek session’ın release işlemini gerçekleştiriyoruz.
“ProcessErrors” method’u içerisinde ise ilgili event’i işleme sırasında herhangi bir hata oluşursa, yapmak istediğimiz işlemleri gerçekleştirebiliriz. Ardından retry mekanizması devreye girmektedir.
Ayrıca bunlara ek olarak istediğimiz bir subscriber’ın spesifik session id’leri için çalışmasını da sağlayabiliriz. Bunun için yine “ServiceBusSessionProcessor” objesini initialize ederken aşağıdaki gibi session id’lerini set etmemiz yeterli olacaktır.
var options = new ServiceBusSessionProcessorOptions() { SessionIds = { "my-x-sessions", "my-y-sessions" } };
Dilersek event’leri işlerken ilgili session özelinde de daha sonra kullanmak üzere state bilgileri tutabilmekteyiz.
async Task ProcessMessages(ProcessSessionMessageEventArgs args) { await args.SetSessionStateAsync(BinaryData.FromString("some state")); string someState = (await args.GetSessionStateAsync()).ToString(); }
Şimdi hızlı bir test gerçekleştirebilmek için ise 2 subscriber ve 1 publisher çalıştıralım.
Gördüğümüz gibi session’ı ilk kabul eden sağ tarafdaki subscriber, session içerisindeki ilgili tüm event’leri publish edildikleri sıra içerisinde işledi ve ilgili session’ı kapattı.
Toparlayalım
Makalenin başında da dediğim gibi, maalesef bazen bazı logic’leri istemediğimiz şekilde implemente etmek zorunda kalabiliyoruz. Sırf bu yüzden yıllar önce sadece tek bir instance ile çalışması gereken bir kaç subscriber’larımız vardı ve ilgili event işleme sıralarını koruyabilmek adına farklı taklalar atıyorduk.
Bu tarz senaryoya gereksinim çıkartmayacak şekilde event’lerimizi/sistemimizi design etmemiz, oldukça faydamıza olacaktır. Elbette business karmaşıklığına ve o anki gereksinimlerine göre değişiklilik gösterecektir.
Bu makale kapsamında ise bu tarz bir ihtiyaç karşısında, Azure Service Bus ve sessions özelliğini kullanarak, subscriber tarafında minimum efor ile FIFO sıralama ve işleme işlemini nasıl gerçekleştirebileceğimizi göstermeye çalıştım. Ayrıca bahsettiğim gibi subscriber tarafında event’leri işlerken “session lock duration“, “timeouts” ve “retry” gibi konfigürasyonları dikkatlice yapmamız gerektiğini de unutmamamız gerekmektedir.
Referanslar
https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions
Thanks Gokhan for the article. As always all good all clear. It’s an interesting feature but still one important question is open for me. Let’s say we have two consumers C1 and C2 consuming messages from a queue. The queue contains some messages of sessions A and B in the following order: A1, A2, B1. So let’s say the consumer C1 takes the message A1 and locking the session A. Next message in the queue is A2 (also belongs to the session A). So what in this case consumer C2 is going to do? Is it blocked and idling because the session A is locked or it will somehow skip the message A2 and start processing the message B1?
Thanks Vitalii, I’m glad that you found the article clear. Yes indeed, your second consumer as you called C2, won’t stay in idle mode. That’s the good part of Azure Service Bus. C2 will also grap other event from the same queue which belongs different session or haven’t locked yet. In short, it allows us to process different sessions in parallel even if they are in same queue.
wow, that’s really cool! thanks again for sharing your knowledge!
I was exactly encountered this business scenario
Takipteyim kaliteli ve güzel bir içerik olmuş dostum.
Çok teşekkür ederim.
Thanks for the awesome article. I was really looking to solver the scenario. Assume I have to combine the output from multiple services using a unique Session ID. How do I guarentee that the operation is performed only if all the services has completed processing the messages corresponding to the Session ID?
How do we handle exception and waiting time?
How to handle the scenario if message failure?