Günümüz teknoloji çağında neredeyse hepimiz microservice’ler hakkında konuşuyor ve uygulamalar geliştirmeye çalışıyoruz. Yüzeysel baktığımızda her şey çok net ve uygulaması kolay gibi görünsede, özellikle söz konusu distributed transaction yönetimi olduğunda işler daha karmaşık bir hal almaya başlıyor.
Çünkü business’ımızın sağlıklı bir şekilde ilerleyebilmesini sağlayabilmemiz ve nihai business outcome’ına ulaşabilmemiz için, data’nın tutarlılığını sağlayabilmemiz gerekmektedir.
Bu makale kapsamında ise distributed ortamlarda transaction işlemlerini, Choreography-based Saga pattern’ı ile nasıl gerçekleştirebileceğimizi göstermeye çalışacağım.
Distributed ortamlardaki transaction yönetimi konusunda saga pattern’ı bizlere “Choreography” ve “Orchestration” olmak üzere iki farklı yaklaşım sunmaktadır.
2017 yılında saga pattern’ını orchestration yaklaşımı ile nasıl implemente edebileceğimizi buradaki makalemde ele almaya çalışmıştım. Bu makale kapsamında ise herhangi bir orchestrator’a sahip olmadan saga pattern’ını loosely coupled olarak nasıl implemente edebileceğimizi göstermeye çalışacağım.
Choreography-based saga yaklaşımındaki ana fikir, her microservice’in bireysel olarak sorumluluklarını sırasıyla yerine getirmesine ve consistency’i sağlayabilmek için beraber hareket etmelerine dayanmaktadır.
Bir başka değişle her microservice kendi sorumluluğunu yerine getirdiğinde, transaction’ı distributed ve asynchronous olarak devam ettirebilmesi için bir sonraki aşamayı ilgili bir business event’i ile tetiklemesi gerekmektedir.
Bir e-ticaret firmasında çalıştığımızı ve ödemeleri asynchronous olarak gerçekleştirebilmek için aşağıdaki gibi basit bir flow’a sahip olduğumuzu düşünelim.
Yukarıdaki happy-path flow’a baktığımızda;
Implementasyon kısmını açıklamaya geçmeden önce, örnek projenin tamamına buradan ulaşabilirsiniz.
Yukarıdaki happy-path flow’a ek olarak aşağıdaki gibi business requirement’larına da sahip olduğumuzu varsayalım:
Sistem içerinde bir sipariş oluşturabilmek için Order API aşağıdaki gibi bir controller ve service’e sahiptir.
[ApiController] [Route("[controller]")] public class OrdersController : ControllerBase { private readonly IOrderService _orderService; public OrdersController(IOrderService orderService) { _orderService = orderService; } [HttpPost] public async Task<IActionResult> CreateOrder(CreateOrderRequest request) { await _orderService.CreateOrderAsync(request); return Accepted(); } }
public class OrderService : IOrderService { private readonly IBus _bus; public OrderService(IBus bus) { _bus = bus; } public async Task CreateOrderAsync(CreateOrderRequest request) { // Order creation logic in "Pending" state. await _bus.PubSub.PublishAsync(new OrderCreatedEvent { UserId = 1, OrderId = 1, WalletId = 1, TotalAmount = request.TotalAmount, }); } public Task CompleteOrderAsync(int orderId) { // Change the order status as completed. return Task.CompletedTask; } public Task RejectOrderAsync(int orderId, string reason) { // Change the order status as rejected. return Task.CompletedTask; } }
“CreateOrderAsync” method’u, sipariş’i ilk olarak pending durumunda oluşturduğumuz noktadır. Çünkü bu aşamada henüz sipariş içerisindeki ürünlerin stok durumlarını veya ödemeyi başarıyla gerçekleştirip gerçekleştiremeyeceğimizi bilmiyoruz.
Sipariş oluşturulduktan sonra ise distributed bir şekilde işlem bütünlüğünü sağlayabilmemiz için, “OrderCreatedEvent” adında bir event publish ediyoruz. Bir nevi işlemler zincirin bir sonraki aşamasını burada tetikliyoruz.
“CompleteOrderAsync” method’unu ise, tüm işlemlerin başarıyla gerçekleştiği durumda siparişi pending durumundan completed durumuna güncelleyebilmek için kullanacağız.
İşlemlerin başarıyla tamamlanıp tamamlanmadığını Order Service’in anlayabilmesi için ise, içerisinde işlemler zincirinin son parçası olan “PaymentCompletedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class PaymentCompletedEventConsumer : IConsumeAsync<PaymentCompletedEvent> { private readonly IOrderService _orderService; public PaymentCompletedEventConsumer(IOrderService orderService) { _orderService = orderService; } public async Task ConsumeAsync(PaymentCompletedEvent message, CancellationToken cancellationToken = default) { await _orderService.CompleteOrderAsync(message.OrderId); } }
Ayrıca “RejectOrderAsync” method’unu da, ödeme işlemleri aşamasında oluşabilecek herhangi bir hata durumunda ilgili siparişi rejected durumuna getirebilmek için kullanacağız.
Order Service’in ilgili sipariş durumunu rejected olarak güncelleyebilmesi için ise, Stock Service’inin ilgili ürün stok’larını tekrardan release ettikten sonra publish ettiği “StocksReleasedEvent” ini aşağıdaki gibi consume etmektedir.
public class StocksReleasedEventConsumer : IConsumeAsync<StocksReleasedEvent> { private readonly IOrderService _orderService; public StocksReleasedEventConsumer(IOrderService orderService) { _orderService = orderService; } public async Task ConsumeAsync(StocksReleasedEvent message, CancellationToken cancellationToken = default) { await _orderService.RejectOrderAsync(message.OrderId, message.Reason); } }
Sistem içerisinde bir sipariş pending durumunda oluşturulduğunda, o sipariş içerisindeki ürünlerin stok’ları Stock Service’i tarafından rezerve edilmektedir.
Bunun için Stock Service’i içerisinde “OrderCreatedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class OrderCreatedEventConsumer : IConsumeAsync<OrderCreatedEvent> { private readonly IStockService _stockService; private readonly IBus _bus; public OrderCreatedEventConsumer(IStockService stockService, IBus bus) { _stockService = stockService; _bus = bus; } public async Task ConsumeAsync(OrderCreatedEvent message, CancellationToken cancellationToken = default) { await _stockService.ReserveStocksAsync(message.OrderId); await _bus.PubSub.PublishAsync(new StocksReservedEvent { UserId = message.UserId, OrderId = message.OrderId, WalletId = message.WalletId, TotalAmount = message.TotalAmount }); } }
Bu consumer içerisinde basit olarak ürünlerin stok’larının rezerve işlemleri gerçekleştirilip, ardından “StockReservedEvent” i publish edilmektedir. Böylelikle sipariş işlem bütünlüğünün bir aşaması daha tamamlanıp, bir sonraki aşaması tetiklenmiş olmaktadır.
Ayrıca ödeme işlemlerinde oluşabilecek herhangi bir hata durumuna kaşı rezerve edilen ürünlerin stok’larının tekrardan release edilebilmesi için, “PaymentRejectedEvent” ini dinleyen aşağıdaki gibi bir consumer’a da sahiptir.
public class PaymentRejectedEventConsumer : IConsumeAsync<PaymentRejectedEvent> { private readonly IStockService _stockService; private readonly IBus _bus; public PaymentRejectedEventConsumer(IStockService stockService, IBus bus) { _stockService = stockService; _bus = bus; } public async Task ConsumeAsync(PaymentRejectedEvent message, CancellationToken cancellationToken = default) { await _stockService.ReleaseStocksAsync(message.OrderId); await _bus.PubSub.PublishAsync(new StocksReleasedEvent { OrderId = message.OrderId, Reason = message.Reason }); } }
Sipariş içerisindeki ürünlerin stok’ları rezerve edildikten sonra, ödeme işlemi Payment Service’i tarafından gerçekleştirilmektedir.
Bu service içerisinde ödeme işlemlerinin tetiklenebilmesi için “StocksReservedEvent” ini dinleyen aşağıdaki gibi bir consumer bulunmaktadır.
public class StocksReservedEventConsumer : IConsumeAsync<StocksReservedEvent> { private readonly IPaymentService _paymentService; private readonly IBus _bus; public StocksReservedEventConsumer(IPaymentService paymentService, IBus bus) { _paymentService = paymentService; _bus = bus; } public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default) { Tuple<bool, string> isPaymentCompleted = await _paymentService.DoPaymentAsync(message.WalletId, message.UserId, message.TotalAmount); if (isPaymentCompleted.Item1) { await _bus.PubSub.PublishAsync(new PaymentCompletedEvent { OrderId = message.OrderId }); } else { await _bus.PubSub.PublishAsync(new PaymentRejectedEvent { OrderId = message.OrderId, Reason = isPaymentCompleted.Item2 }); } } }
Burada ise basit olarak ödeme işlemleri gerçekleştirilmektedir. Eğer ödeme işlemi başarıyla gerçekleştirilirse, sipariş’in durumunun Order Service tarafından completed olarak güncellenebilmesi için “PaymentCompletedEvent” adında bir event publish edilmektedir.
Eğer ödeme işlemi başarıyla gerçekleştirilemezse, “PaymentRejectedEvent” adında bir event publish edilmektedir. Böylece Stock Service’i rezerve edilen ürünlerin stok’larını, örnek business requirement’ı gereği tekrardan release edebilmektedir.
Böylelikle sipariş transaction’ı distributed bir şekilde uçtan uca loosely coupled ve consistent olarak tamamlanmış olmaktadır.
Her design pattern’ın spesifik bir business problemine bir çözüm önerisi getirdiği gibi, saga pattern’ıda bizlere distributed ortamlarda transaction yönetimi konusunda bir yöntem sunmaktadır. Bu yöntemin arkasındaki temel mantık ise, bir dans ekibi içerisindeki her bir üyenin bir akış ve uyum içerisinde hareket ettikleri gibi uygulamalarımızın da bir akış ve uyum içerisinde hareket etmelerine dayanmaktadır.
Uygulanması her ne kadar basit bir pattern gibi görünsede, elbette getirdiği bazı dez avantajları/zorlu yönleri de bulunmaktadır.
{:en}In today’s technological age, we typically build our application solutions on event-driven architecture in order…
{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…
{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…
{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…
{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…
{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…
View Comments
Great explanation, keep up the good work
Thanks!
https://github.com/GokGokalp/choreography-saga-dotnet