Categories: RabbitMQ

C# ile RabbitMQ Client’ı kullanarak Publisher ve Consumer Yapısı

Merhaba arkadaşlar.

Bir önceki makalem olan “RabbitMQ Nedir ve Windows’a Kurulumu” isimli makale ile, RabbitMQ konusuna bir giriş yapmış idik. Bu makale kapsamında ise RabbitMQ’nun C# provider’ını kullanarak en sık kullanılan Messaging Pattern‘lerinden birisi olan “Publish / Subscribe” pattern’i mantığında, basit bir şekilde nasıl mesaj gönderilir ve alınıra bakıyor olacağız.

RabbitMQ’nun Procuder, Queue ve Consumer yapısını basit olarak hatırlamak gerekirse;

  • Producer: Queue’ya mesaj gönderen uygulamadır. Yani Publisher’ımız.
  • Consumer: Queue’daki mesajları dinleyecek olan uygulamamızdır.

Genel bilgileri tekrardan hatırladığımıza göre şimdi kodlamaya başlayabiliriz. Örnek olarak bir Console Application oluşturacağız ve Queue’ya mesaj gönderecek olan Publisher’ı ve bu mesajları dinleyecek olan Consumer yapısını oluşturacağız.

Öncelikle Nuget Package Manager üzerinden aşağıda görmüş olduğumuz “RabbitMQ.Client” paketini kuralım.

Kurmuş olduğumuz bu client paketi sayesinde RabbitMQ ile haberleşebileceğiz.

RabbitMQ’nun C# Client provider’ı connection işlemlerini “ConnectionFactory” üzerinden yaratarak gerçekleştirmektedir. Bu connection’a hem “Publisher” hemde “Consumer” içerisinde aynı şekilde ihtiyacımız olacağı için, “RabbitMQService” isminde bir class ekleyelim ve burada “ConnectionFactory” class’ını kullanarak ortak işlemleri kodlamaya başlayalım.

using RabbitMQ.Client;

namespace RabbitMQSample
{
    public class RabbitMQService
    {
        // localhost üzerinde kurulu olduğu için host adresi olarak bunu kullanıyorum.
        private readonly string _hostName = "localhost";

        public IConnection GetRabbitMQConnection()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory()
            {
                // RabbitMQ'nun bağlantı kuracağı host'u tanımlıyoruz. Herhangi bir güvenlik önlemi koymak istersek, Management ekranından password adımlarını tanımlayıp factory içerisindeki "UserName" ve "Password" property'lerini set etmemiz yeterlidir.
                HostName = _hostName
            };

            return connectionFactory.CreateConnection();
        }
    }
}

RabbitMQService class’ımız hazır durumda. Connection işlemleri “IConnection” interface’inden türemektedir ve factory class’ı üzerinden “HostName” property’sini set ederek, “CreateConnection()” method’u aracılığı ile yeni bir connection oluşturulabilmektedir.

Şimdi “Publisher” görevini görecek olan Publisher class’ını kodlamaya geçelim.

using System;
using System.Text;

namespace RabbitMQSample
{
    public class Publisher
    {
        private readonly RabbitMQService _rabbitMQService;

        public Publisher(string queueName, string message)
        {
            _rabbitMQService = new RabbitMQService();

            using (var connection = _rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queueName, false, false, false, null);

                    channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message));

                    Console.WriteLine("{0} queue'su üzerine, \"{1}\" mesajı yazıldı.", queueName, message);
                }
            }
        }
    }
}

Şimdi neler yaptığımıza adım adım bir bakalım.

var channel = connection.CreateModel()

Yukarıdaki satır ile açmış olduğumuz connection üzerinden “CreateModel” method’unu çağırarak, RabbitMQ üzerinde yeni bir channel/session yaratmaktayız. Bu Channel sayesinde bir Queue oluşturabilirken, mesaj gönderme işlemlerini de gerçekleştirebilmekteyiz.

channel.QueueDeclare(queueName, false, false, false, null);

Bu satırda ise method isminden de anlaşılabileceği üzere, yeni bir queue tanımlıyoruz. Burada önemli olan ilk üç parametresine bakmak gerekirse;

  • queue: Oluşturulacak olan Queue’nun ismi.
  • durable: Bu parametre ile in-memory olarak çalışan Queue disk üzerinden çalışmaya başlayacaktır. Bu sayede RabbitMQ servisi dursa bile Queue kaybolmayacaktır. Her güzelliğin getirdiği bir kötü tarafın olduğu gibi bununda beraberinde getireceği latency problemi bulunmaktadır haliyle.
channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message));

Artık Channel ve Queue hazır bir durumda olduğu için, “BasicPublish” method’u ile kolay bir şekilde oluşturmuş olduğumuz ilgili Queue’ya mesaj gönderiyoruz.

“BasicPublish” method’un kullandığımız parametrelerine bakarsak:

  • exchange: Bu parametreyi es geçiyoruz. Exchange genel olarak mesajı ilgili Routing Key’e göre ilgili Queue’ya yönlendiren bölümdür. Direct Exchange, Fanout Exchange ve Topic Exchange gibi tipleri bulunmaktadır. Bunları bir sonraki makalemde detaylı olarak ele alacağım.
  • routingKey: Burada girmiş olduğumuz key’e göre ilgili Queue’ya yönlendirilecektir mesaj.
  • body: Queue’ya göndermek istediğimiz mesajı byte[] tipinde gönderiyoruz.

Publisher’ı test edebilmek için “Program.cs” class’ını açalım ve aşağıdaki gibi kodlayalım.

namespace RabbitMQSample
{
    class Program
    {
        private static readonly string _queueName = "GOKHANGOKALP";
        private static Publisher _publisher;

        static void Main(string[] args)
        {
            _publisher = new Publisher(_queueName, "Hello RabbitMQ World!");

             Console.ReadKey();
        }
    }
}

Queue ismini “GOKHANGOKALP” olarak belirleyelim ve constructor üzerinden Queue ismini ve göndermek istediğimiz mesajı set edelim. Uygulamayı çalıştırmadan önce RabbitMQ Management ekranı üzerinden Queues tab’ına bir bakalım. Management ekranına “localhost:15672” adresinden erişebilirsiniz.

Yukarıda gördüğümüz gibi şuan herhangi bir Queue bulunmamaktadır. Şimdi oluşturmuş olduğumuz Console uygulamasını çalıştıralım.

Publisher Console uygulaması üzerinden başarılı bir şekilde çalıştı ve “GOKHANGOKALP” isminde bir Queue oluşturarak “Hello RabbitMQ World!” mesajını Queue’ya ekledi. Management ekranını bir de şimdi kontrol edelim.

Tada… “GOKHANGOKALP” isminde bir Queue oluşmuş ve “Read” state’inde bir mesajın bulunduğunu söylüyor. Şimdi sıra geldi bu mesajları alacak yani Subscribe edecek olan Consumer’ı kodlamaya. “Consumer” isminde yeni bir class oluşturarak, aşağıdaki gibi kodlamaya başlayalım.

using System;
using System.Text;
using RabbitMQ.Client.Events;

namespace RabbitMQSample
{
    public class Consumer
    {
        private readonly RabbitMQService _rabbitMQService;

        public Consumer(string queueName)
        {
            _rabbitMQService = new RabbitMQService();

            using (var connection = _rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);
                    // Received event'i sürekli listen modunda olacaktır.
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine("{0} isimli queue üzerinden gelen mesaj: \"{1}\"", queueName, message);
                    };

                    channel.BasicConsume(queueName, true, consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

Consumer kısmında adım adım ne yaptığımızı inceleyelim şimdi.

var consumer = new EventingBasicConsumer(channel);

Asıl işlemlerimizi gerçekleştirecek oldıuğumuz “consumer” nesnemizdir. “EventingBasicConsumer” class’ı sayesinde constructor üzerinden channel verildiğinde, “Recevied” event’i sayesinde sürekli “listening” modunda olacaktır. Queue’daki ilgili mesajları sırasıyla almaktadır ve “Body” property’sinde barındırmaktadır. Hatırlarsak mesajları “byte[]” tipinde göndermiştik. Göndermiş olduğumuz mesajın “string” tipinde olduğunu bildiğimiz için “Encoding” altında bulunan “GetString” method’u ile “Decode” işlemini gerçekleştiriyoruz.

NOT: RabbitMQ Queue’su FIFO(First in Frist out) mantığında çalışmaktadır.

channel.BasicConsume(queueName, true, consumer);

Burada ise method isminden yine anlaşılabileceği üzere (isimlendirmenin ne kadar önemli olduğu buradan da anlaşılabilir) basic bir şekilde verilmiş olan Queue ismine göre mesajları alma işlemini başlatıyoruz. Burada dikkat çekmek istediğim bir kaç parametre bulunmaktadır:

  • queue: Hangi Queue’nun mesajları alınacak ise.
  • noAck: True olarak set edildiği taktirde, consumer mesajı aldığı zaman otomatik olarak mesaj Queue’dan silinecektir. Eğer Queue üzerinden silinmesini istemiyor iseniz, False olarak set etmeniz gerekmektedir.

Console uygulamasının “Program.cs” class’ını şimdi aşağıdaki gibi güncelleyelim ve tekrardan çalıştıralm.

namespace RabbitMQSample
{
    class Program
    {
        private static readonly string _queueName = "GOKHANGOKALP";
        private static Publisher _publisher;
        private static Consumer _consumer;

        static void Main(string[] args)
        {
            _publisher = new Publisher(_queueName, "Hello RabbitMQ World!");

            _consumer = new Consumer(_queueName);
        }
    }
}

Publisher’ı aynı şekilde başlatarak tek fark olarak Consumer’ı tanımlıyor ve aynı “_queueName” parametresini geçerek, aynı Queue üzerindeki mesajları dinlemesini söylüyoruz.

Bunun sonucunda Console uygulamasının çıktısı aşağıdaki gibi olacaktır.

Gördüğümüz gibi sonuç olarak Consumer başarılı bir şekilde “GOKHANGOKALP” Queue’sunu dinleyerek, içerisinde bulunan mesajı çekip ekrana basmıştır. “noAck” parametresini True olarak set etmiş idik. Şimdi Management ekranını bir kontrol edelim bakalım Queue üzerinde herhangi bir mesaj var mı?

Gördüğümüz gibi “GOKHANGOKALP” Queue’su olduğu gibi durmaktadır fakat herhangi bir mesaj içerisinde bulunmamaktadır. Şimdi “noAck” parametresini False olarak set edip, tekrardan Console uygulamasını çalıştıralım ve sonuca tekrardan Management ekranı üzerinden bir bakalım.

channel.BasicConsume(queueName, false, consumer);

“noAck” parametre değeri False olduğundan dolayı Consumer mesajı çekmesine rağmen, mesaj Queue üzerinde “Ready” state’inde durmaktadır. Queue üzerindeki mesajları silme gibi kararları, business rule’larımıza bağlı olarak bizlerin ele alması daha doğru bir karar olacaktır.

Bu makalemde basic bir şekilde RabbitMQ’nun C# Client’ını kullanarak, mesaj işlemlerini nasıl handle edebileceğimizi hep beraber inceledik. Bir sonraki RabbitMQ serimde görüşmek dileğiyle.

Sağlıcakla kalın.

RabbitMQSample

Gökhan Gökalp

View Comments

  • Merhaba gökhan hocam. Rabbitmq konusunda mesajların default olarak memory üzerinde tutulduğunu okudum. Ancak gerçek bir senaryoda Rabbitmq sunucusunun kapanıp açıldığı durumda mesajların kaybolması söz konusu oluyor. Tüm mesajların diske yazılması da yoğun mesajlaşmada performansa etki eder mi? Bu gibi sorunlarda nasıl bir strateji uygulanır. Sonuçta herkesin verisi önemlidir ve kimse memory üzerinde kritik bilgi saklamak istemez.

    • Merhabalar, evet default olarak in-memory çalışmaktadır. Persistent'ı sağlayabilmek için ise "durable" ayarlarına bakabilirsiniz disk üzerine yazabilmek için. Detaylı bilgiye buradan erişebilirsiniz. Gerçek bir senaryoda ise bu sizin business'ınıza ve ihtiyaçlarınıza göre farklılık gösterir. Elbette in-memory kadar performanslı olmayacaktır disk'e yazılması. Disk'e yazma ihtiyacınızıda, iyi belirlemeniz gerek ne kadar yoğun bir trafik altında çalışacak ve boyutları ne olacak vb. Çünkü bu sadece asenkron yapılabilecek küçük işlemler için bir mesajlaşma yapısı ve o yüzden kritik bilgiler de içermez. Sadece bir yere yazılmış işi alır, ilgili yere yönlendirir (aktarır).

    • Merhaba Gökhan Bey, aşağıda ki gibi bir kodu var elimde,
      ki bu kod sizin makaleniz den alınma aynı işi yapıyoruz. Benim sorunum şu bu console app'i "dotnet" komutu ile ubuntu console ekranında çalışıtrdığım zaman cosnumer görevini yapıyor.
      using System;
      using System.Text;
      using RabbitMQ.Client.Events;

      namespace RabbitMQSample
      {
      public class Consumer
      {
      private readonly RabbitMQService _rabbitMQService;

      public Consumer(string queueName)
      {
      _rabbitMQService = new RabbitMQService();

      using (var connection = _rabbitMQService.GetRabbitMQConnection())
      {
      using (var channel = connection.CreateModel())
      {
      var consumer = new EventingBasicConsumer(channel);
      // Received event'i sürekli listen modunda olacaktır.
      consumer.Received += (model, ea) =>
      {
      var body = ea.Body;
      var message = Encoding.UTF8.GetString(body);

      Console.WriteLine("{0} isimli queue üzerinden gelen mesaj: \"{1}\"", queueName, message);
      };
      channel.BasicConsume(queueName, true, consumer);
      Console.ReadLine();
      }}}}}

      Fakat bu uygulamayı

      [Unit]
      Description=Consumer

      [Service]
      WorkingDirectory=/var/www/Consumers/RabbitMqConsumer/
      ExecStart=/usr/bin/dotnet /var/www/Consumers/RabbitMqConsumer/ClientApps.RabbitMqConsumer.dll
      Restart=always
      # Restart service after 5 seconds if the dotnet service crashes:
      RestartSec=5
      KillSignal=SIGINT
      SyslogIdentifier=dotnet-clientapprabbitmqconsumerexchange
      User=www-data
      Environment=ASPNETCORE_ENVIRONMENT=Production
      Environment=DOTNET_PRINT_TEL
      EMETRY_MESSAGE=false
      [Install]
      WantedBy=multi-user.target
      Şeklinde çalıştırdığım zaman servis sürekli uygulamayı kill edip yeniden başlatıyor ve dataları kaybediyorum.
      Tecrübe ettiniz mi bu durumu.

      • Merhaba, kusura bakmayın geç cevap için bugünlerde taşındığım için oldukça yoğunum. Daha önce ubuntu üzerinde service olarak register etmeyi denemedim maalesef. Hata log'larına bakmayı denediniz mi? Birde consumer'ın ayağa kalkabilmesi için öncelikle beklediği bir şey mi var? Restart kısmında ise "always" yerine "on-failure" deneyebilir, hatayı yakalayabilirsiniz belki.

        Birde uygulamayı nasıl initialize ediyorsunuz bilmiyorum ama, daemon service olarak ayağa kaldırmayı denediniz mi? "Microsoft.Extensions.Hosting.Systemd" paketini kullanarak.

        İyi günler dilerim.

  • Gökhan Hocam Merhaba.RabbitMQ makale serisi için teşekkürler,çok faydalı oldu.Publisher ile RabbitMQ ye 1000 in üzerinde kayıt atıp,tüm kayıtlar için Consumer i çalıştırdığım zaman RabbitMQ da Ready statusündeki tüm kayıtlar 0 olarak gözüktü ve Consumer kayıtları teker teker
    işlemeye başladı.Fakat ben Consumer u durdurunca işlenmemiş tüm kayıtlarda kayboldu.RabbitMQ da bu şekilde bir birikme olduğunda Consumer ın kayıtları teker teker işleyemeyip yarıda kesilirse nasıl bir yol izlemek gerekir?autoAck parametresini false set ettiğimiz zamanda Consumer ı yarıda kesince tüm kayıtlar tekrar Ready statusüne geçti.MQ nun mantığında kayıtları sırasıyla okunup silinmesi gerekmiyor mu?RabbitMQ da bunu biz mi yönetmeliyiz? Teşekküler

    • Merhaba, eğer pure rabbitmq'nun client'ı ile ilerlerseniz, reliability konularını sizin yönetmeniz gerekmektedir, yani ack bilgilerini. Sırasıyla okunup, başarılı bir şekilde işlenir ve ack bilgisi gönderilirse silinir, ack bilgisi gönderilemez ise, tekrardan ready moda geçecektir. Dilerseniz MassTransit ile ilgili olan makalelerime bir göz atın, reliability konularını handle etmektedir.

      • Merhaba,MQ konularına yeni bakıyorumda ActiveMQ yu da kurup denedim ve ActiveMQ da consumer mesajları tek tek almakta ve okuduğunuz mesaj otomatik olarak silinmektedir.Bu konuyuda araştırıyorum ama sizede sormak istedim.
        RabbitMQ da mesajları tek tek alıp consumer ı çalıştırabilir miyiz?ack bilgisini gönderebiliyoruz ama gördüğüm kadarıyla kuyruktaki tüm bilgiler için bu bilgi gönderilmektedir ve kuyrukta birden fazla kayıt varsa
        consumer durduğu zaman bunları ya tamamı ready durumuna geçmekte ya da consumer durdurduğunuzda işlenmemiş kayıtlar kaybolmaktadır.Producer ile consumer aynı anda değilde Producer ile belirli bir kayıt aktarıp
        sizin örnek kodunuz üzerinden Consumer ı çalıştırınca kuyruktaki tüm mesajları tek tek değilde tamamını almakta ve daha sonra tek tek kayıtlar işlenmektedir.
        RabbitMQ nun çalışma mantığı mı böyle,ben bunları sırası ile işleyip işlem yaptığım kaydı silmesi için tekrar tek bir kayıt için ack bilgisimi göndermem gerekiyor?

  • BrokerUnreachableException was unhandled Hatası
    An unhandled exception of type 'RabbitMQ.Client.Exceptions.BrokerUnreachableException' occurred in RabbitMQ.Client.dll

    Additional information: None of the specified endpoints were reachable

    Hocam burada böyle bir hata alıyorum localde çalışmıyorum kullanıcı adı ve şifreyi tanımlamama rağmen böyle bir hata veriyor sebebi nedir ?

  • Merhabalar Gökhan hocam. Size bir kaç sorum olacak. Öncelikle yazılarınız için teşekkür ediyorum çok yardımcı oluyor. Sorumun ilki şudur. Bu yapıyı yani kuyruk yapılarını servis (rest servis) yapılı bir mimaride nereye oturtmam lazım? Ben de her modül için bir servis var. O servis içine o modüle özgü talepleri alıyor. Yani Token yapısı gibi, talepte bulunan kullanıcı bilgisi gibi. Bu durumda MQ yapımı nereye kurmam lazım? Servisin önüne koyup talepleri servise iletmesi mi yoksa servisim talepleri alıp MQ yapısına mı iletmesi?

    Buna bağlı olarak 2. sorum da şu olacaktır. Birden fazla server kullanıp yük dağılımı yapmak mı yoksa MQ yapısı kullanmak mı? Yoksa ikisi bir arada mı? Eğer ikisi bir arada olacaksa her server için ayrı bir MQ yapısı mı kurmam gerekli bunlar nasıl ortak çalışacak?

    Cevap için şimdiden teşekkürler.

    • Merhaba, kusura bakmayın geç cevap için.

      1) REST servis'leriniz deki iletişimi kuyruk yapıları ile, birbirlerinden decoupled hale getirebilmekte, asenkronlaştırabilmekte gibi konularda kullanabilirsiniz. Daha iyi scale olabilen, flexible servisler elde edebilirsiniz. Token yapınızda herhangi bir MQ ya gerek olduğunu düşünmüyorum. Token alma işlemleri genelde senkron ilerlemektedir. Belki login olan kişileri log tutmak istersiniz, o gibi durumlarda token veren servisiniz içerisinde bir event fırlatarak log tutarsınız ozaman ihtiyaç olur.

      2) İkisi bir arada. MOM dediğimiz Message Oriented Middleware tüm ekosisteminiz içinde tektir ve uygulamalarınız arası iletişimi sağlar.

      Teşekkürler.

  • Gokhan hocam merhaba. Consumer tarafini windows service yazip surekli olarak onstart metodunda queden mesajlari cekip islemeyi dusunuyorum.
    Sizce bu dogru bir yaklasim olur mu?
    Tesekkurler.

    • Merhaba, evet windows service olarak çalışması olması gereken bir yaklaşım.

Recent Posts

Overcoming Event Size Limits with the Conditional Claim-Check Pattern in Event-Driven Architectures

{:en}In today’s technological age, we typically build our application solutions on event-driven architecture in order…

2 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

7 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

10 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 years ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago