C# — RabbitMQ Ve Kullanım Örneği

Engin UNAL
14 min readNov 9, 2023

--

Bu yazıda RabbitMQ nedir? Nasıl kullanılır? Neden tercih ediliyor? Genel kullanım alanları nelerdir? Sorularına cevaplar arayacağız. Devamında C# ile RabbitMQ erişimi ve yönetimine değinip kodlama örnekleri ile bitireceğiz.

RabbitMQ, açık kaynaklı, çok kullanılan bir mesajlaşma aracısıdır. AMQP, MQTT, STOMP gibi mesajlaşma protokollerini destekler. Mesajların alım ve dağıtımında hızlı, ölçeklenebilir, dağıtık yapıların kurulmasına imkan tanımaktadır. Esasında ondan beklenen ve iyi yaptığı iş ise temel olarak ona iletilen mesajların alıcısına iletilmek üzere bir kuyrukta saklanması ve kuyruktan sırayla alıcıya/alıcılara iletilmesidir. Bu işleri yapan başka uygulamalar da bulunmaktadır. Örneğin ActiveMQ, MSMQ, Azure Service Bus, Amazon SQS, Google Cloud Pub/Sub bunlardan bazıları olarak sıralanabilir.

RabbitMQ bir mesaj kuyruğu yönetim uygulamasıdır veya mesajlaşma aracısıdır demiştik. Çeşitli protokolleri destekler, tek bir protokole bağımlı değildir fakat biz bu yazıda geniş kullanımdaki AMQP(Advanced Message Queuing Protocol) ile devam edeceğiz. AMQP bir mesajlaşma protokolü ve standardıdır. Mesajlaşmanın nasıl bir mimari ve kurallar bütünlüğü içinde olacağını belirleyen standartları tanımlar.

Burada karşımıza iki versiyon çıkar. AMQP 1.0 ve AMQP 0.9.1 versiyonları. 1.0 olan versiyonu 0.9.1'den oldukça farklıdır ve sanki yeni bir protokol tanımlar gibi yapısal değişiklikler içerir. RabbitMQ’nun desteklediği temel protokol 0.9.1 versiyonudur, geliştirilirken bu versiyon baz alınmıştır. Ayrıca sitesinde geliştiriciler için paylaşılan kod örneklerinde de 0.9.1 versiyonu kullanılmıştır. Yazıda da yaygın kullanımda olan bu versiyon yani AMQP 0–9–1 kullanılacaktır.

Peki RabbitMQ neden ve hangi ihtiyaçlar için kullanılır?

Bekleme gerektiren veya uzun süren işlemlerin sonuçlarını o anda almak istemeyebiliriz. Uygulamamızın bir işlemi tamamlaması ve kullanıcıya sonucunu dürmesi uzun sürebilir ve kullanıcının bu işlemi beklemesine gerek olmayabilir. Sonucu daha sonra bildirim, mesaj veya e-mail ile iletmek yeterli olabilir. Örneğin e-ticaret uygulamamız olduğunu düşünelim. Kullanıcı alışveriş yapmış olsun ve ödeme sonrasında kesilen faturanın kullanıcı e-mail adresine gönderilmesi gereksin. Faturanın satış anında gönderilmesiyle birkaç dakika sonra gönderilmesi arasında kullanıcı açısından bir fark olmayacaktır. Satış işlemi yapılır, e-mail mesajı bir mesaj kuyruğuna iletilir ve oradan e-mail gönderen servis gönderim işlemini gerçekleştirir. Bu şekilde arka plandaki servislerin asenkron olarak çalışmaları sağlanmış olur.

Servislerin birbirlerine sıkı bağlı çalışmaları da istenmeyen bir durumdur. Özellikle mikroservis projelerinde servisler arası gevşek bağlı (loosely coupled) ilişkiler tercih edilir. Bu tip tasarımlarda araya bir broker koyarak mimariye esneklik sağlanabilir. Örnekler ve kullanım alanları çok çeşitlidir. Birkaç örnek ile devam edelim. Arkaplandaki mikro-servislerin birbirlerini beklemeden asenkron olarak çalışmaları gerekebilir. Örneğin A servisi, B servisine kaydedilmek üzere bir veri göndersin ama kayıt sonucunu beklemesi gerekmesin. Bunu yapmak için RabbitMQ’daki kuyruğa mesaj gönderir ve B servisi bu mesajı aldığında işler. B servisi işini bitirdiğinde kuyruğa tamamlandı mesajını gönderir ve A servisi bu işin tamamlandığı bilgisini alır.

Veya yük dağılımı yapılarak bazı servislerin aşırı yük altında çalışmalarını önlemek isteyebiliriz. Örneğin fatura hazırlayan servisimiz çok yoğun çalışıyor ve aynı işi yapan ikinci, üçüncü servisleri açmak gerekli. Eğer fatura servisimiz yapacağı işleri RabbitMQ kuyruğundan alıyorsa bu süreç gayet kolay bir şekilde çözülür. Kuyruğu dinleyen ve birbirinden bağımsız çalışabilen fatura servisleri açabilir ve yük dağılımını ayarlayabiliriz.

Veya bazı servislerin kritik önemde olduğu ve üzerinde çalıştığı işleri kaybetmek istemediğimiz durumlar olabilir. Servis çöktüğünde sistemin aksamaması ve işlerin kesintiye uğramadan devam etmesi istenebilir. Bu işleri bir kuyruğa atıp ayakta olan servislerin devralması ve devam etmesine ihtiyaç olabilir. RabbitMQ bu konuda önemli kolaylıklar sağlamaktadır. Kuyruktan çekilen bir mesajın ack bilgisini sonradan verebilme imkanı sunarak servis çökmelerinde mesaj kaybolmasına karşı kolayca önlem alınmasını sağlar. Buna ileriki konularda değineceğim. Yukarıda verilen kullanım örnekleri çeşitlendirilebilir. Genel bakış açısı sağladığını umuyorum.

RabbitMQ nasıl çalışıyor?

Artık kavramlara geçebiliriz. Mesaj gönderme/alma işlemleri en temel olarak aşağıdaki gibi olmaktadır.

  • Publisher: Mesajı RabbitMQ’ya gönderen uygulamadır. (Producer)
  • Consumer: RabbitMQ’daki mesajları okuyan uygulamadır. (Subscriber)
  • Queue: Mesajların saklandığı kuyruk. İlk gönderilen mesaj ilk okunur. (FIFO — First In First Out).

Mesajı gönderecek uygulama(publisher/producer) RabbitMQ’ya bağlanır ve gönderim yapmak istediği mesajı iletir. Mesajları okuyan uygulama(consumer/subscriber) ise RabbitMQ’ya bağlanır ve almak istediği mesajları içeren kuyruğu dinlemeye başlar, gelen mesajları okur.

Bu işlemleri ve ihtiyaca göre daha gelişmişlerini de gerçekleştirebilmek için RabbitMQ tarafında Exchange yapıları bulunmaktadır, şimdi bunları görelim.

Exchange

Dağıtıcı görevi görürler. Gelen postaları ilgili alıcılara dağıtan postacı gibi düşünülebilir. Publisher/Producer tarafından RabbitMQ sistemine gönderilen tüm mesajlar öncelikle Exchange tarafından karşılanırlar. Exchange, kendisine iletilen mesajların hangi kuyruğa/kuyruklara yönlendirileceğine(routing) karar verir.

Bunu yaparken binding, routing key ve header özelliklerini kullanmaktadırlar.

  • Binding : Exchange ile Queue arasındaki bağlantıdır.
  • Routing Key : Mesajın kuyruğa nasıl yönlendirileceğine karar vermek için Exchange’in baktığı alandır.
  • Header Attributes : Mesajları routing key değerine göre değil header’daki değerlere göre kuyruğa yönlendirmek için kullanılır. Sadece Headers Exchange tipi için kullanılır. Gereken bilgi exchange tipleri altında verilecektir.

Exchange Tipleri

Direct, fanout, topic, headers isminde temel dört exchange tipi bulunmaktadır. Bunlar gelen mesajların routing key, header veya binding parametrelerine göre yönlendirme işlemini yaparlar. Bir de default exchange bulunmaktadır, direct exchange tipindedir, rabbitmq tarafından ön tanımlıdır ve kullanılırken ismi boş geçilir. Genellikle exchange tanımlama ile uğraşmadan doğrudan kuyruğa mesaj göndermek için kullanılmaktadır. Tüm kuyruklar kendi isimleriyle bu exchange’e bağlıdır, eğer routing key değerine kuyruk adı verilirse mesaj o kuyruğa yönlendirilir. Okuma için de aynı şekilde routing key değerine kuyruk adı verilerek exchange adı boş geçilerek okuma yapılır. Şimdi exchange tiplerini inceleyelim.

Direct Exchange

Gelen mesajların Routing Key değerlerine göre kuyruklara dağıtılması için kullanılan exchange çeşididir, bir çeşit unicast yapar. Queue ile Exchange arasında bağlantı kurmak gereklidir. Bunun için binding tanımı yapılır. Routing key ile ilişkisi ise şöyledir; binding tanımı yapılırken verilen routing key ile gelen mesajdaki routing key ile eşleşen kuyruğa mesaj iletilir.

Örneğin ex-direct isminde bir exchange oluşturalım. Bunun için rabbitmq kurulumu yapıp yönetim ekranına giriyoruz. Buradaki Exchanges sayfasından aşağıdaki gibi yeni bir exchange ekliyoruz.

Exchanges sayfasından ex-direct isminde exchange ekliyoruz.

Queue eklemek için ise Queues and Streams sayfasına girip “Add a new queue” kısmında yeni queue tanımlarımızı yapıyoruz. q1, q2, q3, q4 isminde dört adet queue oluşturalım.

q1, q2, q3 ve q4 kuyruklarını ekliyoruz.

Tanımladığımız ex-direct isimli exchange tanım ekranına girip(Exchanges sayfasından ex-direct seçilir) aşağıdaki gibi binding tanımlarını yapıyoruz.

ex-direct > Bindings altında q1, q2, q3 ve q4 için binding tanımı yapıyoruz. q3 ve q4 için Routing key değeri olarak renk:mavi veriyoruz.

Buraya kadar direct exchange tanımladık, q1, q2, q3, q4 kuyruklarına bağladık. Şimdi deneyelim. Aynı ekranda “Publish message” kısmından Payload’una birşeyler yazarak ve Routing key değerini boş bırakarak “Publish message” ile mesaj yayınlayalım.

“Message published” mesajı çıkacaktır. Queues ekranından kontrol edersek mesajın q1 ve q2 kuyruklarına yayınladığını görebiliriz.

Routing key değerine renk:mavi yazarak mesaj yayınlaması yaparsak sadece q3 ve q4 kuyruklarına mesaj yönlendirecektir.

Kuyrukları kontrol ettiğimizde gönderilen mesajdaki routing key ile eşleşen q3 ve q4 kuyruğuna mesajın iletildiğini görebiliriz.

ex-direct örneğinde Routing key değerine göre routing yapar. Gelen mesaj verilen “renk:mavi” routing key’i ile eşleşen q3 ve q4 kuyruklarına iletilir.

Özetle direct exchange tipinde bir exchange tanımladığımızda routing key değeri kullanılarak gelen mesajın hangi kuyruklara yönlendirileceğini tanımlayabiliriz. Mesajdaki routing key değeri ile exchange-queue binding tanımındaki routing key değeri eşleşen kuyruklara mesaj iletilmektedir.

Fanout Exchange

Gelen mesajların bağlı tüm kuyruklara dağıtılması için kullanılan exchange çeşididir. Mesajlar tüm alıcılara iletilir yani broadcast yapar. Queue ile Exchange arasında bağlantı kurmak gereklidir. Bunun için binding tanımı yapılır. Routing key dikkate alınmaz. Gelen mesaja routing key verilsin veya verilmesin bind edilmiş tüm kuyruklara bunu iletir.

Routing key ne olursa olsun bind edilmiş tüm kuyruklara gelen mesajı iletir.
Bu exchange tipinde amaç gelen mesajı bağlı tüm kuyruklara iletmektir.

Topic Exchange

Gelen mesajların hangi kuyruklara dağıtılacağına karar verirken, mesajın routing key değeri ile bağlı kuyrukların routing key desenlerini(pattern) karşılaştırır. Yani gelen mesajın routing key değerini alır, bind edilmiş kuyruğun routing key tanımlarını da alır ve aşağıda açıklayacağım kurallara göre mesajın hangi kuyruklara dağıtılacağına karar verir. Özetle multicast yapar diyebiliriz.

Topic exchange kullanımlarında kuyrukların exchange ile bağlantısında(bind) Routing key tanımlaması yaparken * ve # karakterlerini kullanabiliriz. Bunları regular expression pattern’leri gibi düşünebilirsiniz. Eğer * kullanılırsa sadece bir kelimeyi kıyaslar ve herhangi bir kelime olabilir anlamına gelir. Ve # kullanılırsa öncesi veya sonrası tüm kelimeler için herhangi biri olabilir anlamına gelir. Birazdan örnekte daha ne göreceğiz.

Öncelikle kelime kıyaslaması nedir? Buradan devam edelim. Mesajın routing key değeri . işareti ile ayrılmış kelimeler içermelidir. Kıyaslama yapılabilmesi için kelimelerin arasına . işareti konulur.

Örnek olarak kirmizi.kalem değerini routing key olarak gönderdiğimizi düşünelim. Routing işleminde ilk kelime olan “kırmızı” ve ikinci kelime olan “kalem” ayrı ayrı işleme alınıp hangi kuyruktaki pattern ile uyuşuyor kıyaslaması yapılacaktır. q1 adında bir kuyruğu ex-topic isminde tipi topic olan bir exchange açıp bağlayalım. Bağlarken routing key olarak *.kalem verelim. Bu tanımda ilk kelime ne olursa olsun ikinci kelimesi kalem olan mesajlar q1 kuyruğuna düşer. Şimdi q2 kuyruğunu oluşturup routing key değerini kirmizi.* verelim. Böylece ilk kelimesi kırmızı olan tüm mesajlar q2 kuyruğuna düşer. Gelen mesajdaki routing key değerikirmizi.kalem ise mesajımız her iki q1 ve q2 kuyruklarına da düşer. Eğer gelen mesaj tukenmez.kalem ise sadece q1 kuyruğuna düşecektir.

Diğer kıyaslama karakteri # demiştik. Bu da öncesinde veya sonrasındaki kelimelerin tümüne geçiş izni verir. Başka bir örnekle devam edelim. Yine ex-topic isminde bir topic exchange’imiz olsun. Ve isimleri “tshirt”, “pamuklu-tshirt”, “mavi”, “sentetik” isimlerinde dört kuyruğumuz olsun. Amacımız gelen mesajdaki routing key değerine göre tshirt ile başlayan tüm mesajları “tshirt” kuyruğuna, pamuklu tshirt’leri “pamuklu-tshirt” kuyruğuna, mavi renkteki ürünleri “mavi” isimli kuyruğa vs. yönlendirmek olsun. Binding tanımı yaparken örneğin tshirt ile başlayanların tümünü “tshirt” kuyruğuna göndermek için routing pattern olarak tshirt.# veririz. Veya tüm sentetik ürünleri “sentetik” kuyruğuna atmak için #.sentetik veririz.

Örnekteki queue’ler için binding tanımları.

Gelen mesajdaki routing key değeri “tshirt.mavi.pamuklu” olduğunu düşünelim. Mesajı karşılayan ex-topic, bağlı olan kuyruklardaki routing key pattern’lerini alır ve kıyaslama sonucunda “tshirt” kuyruğuna, “pamuklu-tshirt” kuyruğuna ve “mavi” kuyruğuna mesajı iletir.

Gelen örnek mesajların yönlendirmeleri.

Headers Exchange

Mesaj başlığındaki(headers) tanımları kullanarak yönlendirme yapan exchange tipidir. Routing key tanımları dikkate alınmaz/kullanılmaz. Headers exchange’in baktığı mesaj başlığı tanımlarına headers attribute denir. Headers attribute, birden çok olabilir. Gelen mesaj başlığındaki tanımların(header attribute) tümünün uyuştuğu veya en az birinin uyuştuğu durumları kurala bağlamak mümkündür. Eğer headers attribute tanımlarının tümü karşılansın isteniyorsa headers tanımlarına x-match = all değeri eklenir. Veya sadece biri bile karşılansa yeterli ise x-match = any değeri eklenir.

Örnek üzerinden daha açık görebiliriz. Öncelikle ex-headers isminde tipi headers olan bir exchange oluşturalım. Devamında binek-satis, satilanlar ve ticari-satis isminde üç kuyruk oluşturalım. Bunları ex-headers ile bind edeceğiz. Queue ile bind işlemlerinde Arguments kısmında her queue için aşağıdaki tanımları yapalım.

Headers exchange örneği, queue ve exchange binding tanımları.

Yukarıdaki tanımlar şöyle çalışacaktır: Gelen mesajın header özelliklerinde islem:satis ve tip:binek değerlerinin ikisini de sağlayanlar binek-satis kuyruğuna gönderilir. Header’da durum:satildi veya islem:satis değerlerinden birini içerenler satilanlar kuyruğuna gönderilir. Header’da islem:satis ve tip:ticari değerlerinin ikisini de içerenler ticari-satis kuyruğuna gönderilir.

Headers exchange örneği.

Özet

Exchange tipleri ilgili buraya kadar anlatılanların kısa bir özetini yapmak gerekirse. Direct exchange, gelen mesajların routing key değerleri ile bağlı kuyrukların routing key değerlerini kıyaslar ve eşleşenlere mesajları iletir. Fanout exchange, gelen mesajları bağlı tüm kuyruklara iletir. Topic exchange, gelen mesajları routing key değerleri ile bağlı kuyrukların routing key desenleri arasında karşılaştırma yapar fakat burada * ve # karakterlerini kullanarak dağıtımı kurallara göre yapar. Headers exchange, gelen mesajların header tanımlardan yola çıkarak bağlı kuyruklardaki header tanımlarının değerlerini karşılaştırır ve iletimi bu kurallara göre yapar.

Kodlama Örneği

Öncelikle mesaj gönderen Publisher örneğini devamında ise kuyruğu dinleyen Consumer örneğini kodlayacağız. Kodlama örneğini ilk olarak Direct Exchange için yapacağız. Projemizi oluşturalım ve RabbitMQ.Client paketini ekleyerek başlayalım.

mkdir RabbitMQ
dotnet new console -n Publisher
cd Publisher
dotnet add package RabbitMQ.Client

Publisher projesindeki Program.cs içindeki kod aşağıdaki gibi olsun.

Direct exchange konusunda verdiğim örneğin koda dökülmüş hali, görüldüğü gibi RabbitMQ.Client ile exchange, queue vs. tanım yapmak veya silmek oldukça basit. Yukarıda exchange tanımını yapıp iki queue bind ettik ve iki mesaj gönderip test ettik ve sonra Console’dan bir tuşa basılmasını bekleyip oluşturduğumuz exchange ve queue’leri sildik. (Not: VS Code kullanıyorsanız ve ReadKey için hata alıyorsanız launch.json içerisinde “console”: “integratedTerminal” tanımını yapınız.)

Consumer örneğiyle devam edelim. RabbitMQ klasörümüze dönelim ve aşağıdaki komutlarla projeyi oluşturalım.

cd ..
dotnet new console -n Consumer
cd Consumer
dotnet add package RabbitMQ.Client

Yukarıdaki kod örneğinde BasicConsume ile q1 isimli kuyruk dinlemeye alınır ve bu kuyruğa bir mesaj ulaştığında consumer nesnesindeki Received event handler’ı içinde konsola mesaj içeriği yazdırılır. Publisher ve Consumer projelerini çalıştıralım, her biri için bir terminal window açıp dotnet run komutu ile çalıştıracağız.

cd Publisher
dotnet run

cd Consumer
dotnet run

Kod kalabalıklığı olmaması için tüm exchange ler için kod örneği eklemeyeceğim. Fanout Exchange için benzer kodlar kullanılabilir, bilindiği gibi orada sadece bind edilen kuyruklara yönlendirme yapıyor ve routing key’in bir önemi yoktu. Bu nedenle BasicPublish içinde routingKey değerini string.Empty ile boş geçiyoruz. Topic Exchange için de benzer bir durum sözkonusu. Orada da BasicPublish routingKey değeri . (nokta) ile ayrılmış olmalı ve queue tanımlarken pattern( * ve # ile) verebiliyorduk. Yani kodlama olarak değil içerik olarak değişenler olduğundan onları sizin denemenizi tavsiye ederim.

Headers Exchange ile devam edelim. Öncelikle projemizi oluşturalım.

cd ..
dotnet new console -n HeadersPublisher
cd HeadersPublisher
dotnet add package RabbitMQ.Client

QueueBind ile header key-value tanımlarını taşıyan Dictionary ekliyoruz. Artık BasicPublish yaparken properties parametresine geçeceğimiz header değerleri bu tanımlarla karşılaştırılarak ilgili queue’ya iletilecektir.

Mesajların Okundu Olarak İşaretlenmesi (ACK:Acknowledgment)

Şimdiye kadar verdiğim örneklerde consumer tarafında autoAck parametresini true olarak verdik. Bu kullanımda RabbitMQ, mesajı consumer’a iletir ve işlenmesini beklemeden okundu olarak işaretleyerek kuyruktan siler. Kuyruktan silmesi için işlenmesini beklemez iletilmesi yeterlidir. Fakat bu her zaman istenilen bir çalışma şekli olmayabilir.

Olası çökme veya hata durumlarında veri kaybı yaşanmaması önemlidir. Örneğin consumer sunucusu çökerse, ona iletilen ve o anda işlenmekte olan veya işlenmeyi bekleyen mesajların tümü kaybedilebilir. Böyle durumlara karşı manuel ack yöntemi önerilir.

Veya bir consumer problem yaşadığında yeni bir consumer açıp mesajların kayıp yaşanmadan yeni açılan consumer’dan işlenmeye devam etmesini isteyebiliriz. O nedenle RabbitMQ tarafındaki mesajın silinmesi sürecinin iyi analiz edilip tanımlanması gerekir.

Mesajların consumer tarafından işlenmesini bekleyip sonrasında bir onay ile kuyruktan silinmesini sağlamak için ack işlemini manuel yapabiliriz demiştik. Bunun için autoAck parametresini false verip mesaj işlendikten sonra BasicAck metoduyla okundu olarak işaretleriz.

Aşağıda uygulama kod örneği verilmiştir.

Manuel ack ile mesajların manuel olarak okundu olarak işaretlenmesi.

Kod örneğinde BasicAck metoduna gelinceye kadar mesaj kuyrukta bekletilir.

Sık Kullanılan Tasarımlar

RabbitMQ tarafından sunulan teknik çeşitliliğin farklı şekillerde kullanılması mümkün. Bu konuda tasarım kalıplarına uygun ve genel kullanımdaki birkaç örnekle devam edeceğiz.

Work Queue / Task Queue / Competing Consumers Pattern

Mesaj kuyruğundaki mesajların birden çok consumer tarafından paylaşılarak işlenmesine olanak tanıyan tasarımdır. Mesajları paralel olarak olarak işleyebilmek iş yükünün dengelenmesi, yatay olarak ölçeklenebilirlik(yeni consumer’lar ekleyerek), kaynakların optimize kullanılabilmesi gibi avantajlar sunar. Bu tasarım deseninde her mesaj boştaki bir consumer tarafından işlenmektedir.

Mesaj yoğunluğu ve iş yüküne göre consumer sayısı ayarlanabilir.

Mesaj kuyruğundaki birikmelere karşı hızlı ve ölçeklenebilir çözüm üretebilmek, gerekmediği durumlarda da fazladan consumer bulundurmamak yani kaynakların verimli kullanımı bu tasarımın avantajlarıdır.

Örnek üzerinde gidelim. Diyelim ki e-ticaret uygulamanız var ve indirim günlerinde çok fazla işlem gerçekleşiyor. Arkaplanda kurguladığınız RabbitMQ kuyruğunda çok fazla mesaj birikiyor ve bunların eritilmesi gerekiyor ama bir adet consumer bağlamışsınız ve yetişemiyor. Böyle senaryolarda sisteminizi tasarlarken consumer sayılarının artırılıp azaltılabilmesine imkan verecek esnekliği uygulayabilirseniz iş yükünün fazla olduğu zamanlarda yeni consumer’ları devreye alarak işlemlerin tamamlanma sürelerini kısaltabilirsiniz.

RabbitMQ kuyruktaki mesajları consumer’lara dağıtırken standart olarak Round Robin algoritması kullanır yani sırayla dağıtır. Detayına gireceğiz, şimdi kod örneği üzerinden devam edelim. Publisher ve Consumer projesi oluşturup aşağıdaki kısa kodlamayı yapalım.

Publisher projesi Program.cs içeriği:

Publisher projesindeki Program.cs içerisindeki döngüde her saniye q1 isimli kuyruğa yeni bir mesaj gönderiliyor.

Consumer projesindeki Program.cs :

Consumer projesinde ise q1 isimli kuyruk dinlenerek gelen mesajlar ekrana yazdırılıyor. Dışarıdan argüman olarak saniye bilgisi verilirse o kadar saniye bekleme yapıyor sonrasın mesaj işlemeye devam ediyor.

Şimdi bu kodlamayı kullanarak bir publisher ve iki adet consumer çalıştıralım. Publisher her saniye bir mesaj üretip kuyruğa göndersin. Consumer’lar da her işlemde 2 saniye beklesin. Komut satırından aşağıdaki gibi çalıştıralım.

İki consumer da 2sn beklemeyle mesajı işler.

İlk consumer 1,3,5,… no’lu mesajları alırken ikincisi 2,4,6,.. no’lu mesajları işleme almış. RabbitMQ kuyruktaki mesajları dağıtırken Round Robin uyguladığı için sıralı dağıtıldı. Şimdi ilk consumer’ın bekleme süresini 5sn yapalım, ikincisi 2sn olsun. Tekrar çalıştıralım.

İlk consumer 5sn, ikincisi ise 2sn beklemeyle mesajı işler.

Görüldüğü gibi mesajların dağıtılmasında bir değişiklik yok. Sırayla dağıtılıyor. Fakat ilk consumer yavaş olduğundan yetişemiyor, geride kalıyor. Bu nedenle ilk consumer’a atanan fakat yavaş olmasından dolayı işlenemeyen mesajlar var bunlar kuyrukta birikiyor.

Yavaş olan consumer’ı beklediğimiz için kuyrukta birikme yaşanıyor, oysa işini daha kısa sürede bitirip boşta kalan ikinci consumer’a mesajları yönlendirmek daha verimli olacaktır. RabbitMQ’ya mesajı işleyip onaylayana kadar(ack) yeni bir mesaj göndermemesini söyleyebiliriz. Böylece RabbitMQ kuyruktan sıradaki mesajı meşgul olmayan bir sonraki consumer’a gönderecektir. Bunu da BasicQos metodu ile sağlarız. PrefetchCount: 1 vererek RabbitMQ’ya bir consumer’a aynı anda birden fazla mesaj iletmemesini söyleriz.

Consumer kodumuzun son durumu aşağıdaki gibi olur.

Tekrar çalıştırıp denersek aşağıdaki çıktıyı üretecektir.

BasicQos ve PrefetchCount ile consumer’ın aynı anda tek mesaj iletilmesi.

Publish/Subscribe Pattern

Bu tasarım kalıbında, kuyruktaki bir mesaj birden çok consumer’a iletilmektedir. Önemli nokta kuyruğa gönderilen bir mesajın o sırada bağlı olan tüm consumer’lara iletilmesidir. Kaç adet consumer varsa o kadar kuyruk oluşur ve mesaj kopyalanarak kuyruklara iletilir. Bunu sağlamak için Fanout Exchange kullanılır.

Kullanım alanlarına geçersek. Örneğin mikroservislerin çalıştığı bir ortamda gelen bir mesajın tüm mikroservislere yayınlanması istenebilir. Daha somut örnekle; Web sitenizde yeni bir kullanıcı açma işlemi gerçekleşiyor olsun. Yeni kullanıcı açıldıktan sonra bir dizi işlem yapılacaktır ve bu işlemler birbirlerinden bağımsız gerçekleşebilir. Açılan kullanıcıya şifresini gönderen bir mikroservis, işlem loglarını kaydeden bir mikroservis vb. servislere yayınlanan mesajın iletilmesi gerekecektir. Bu örnekteki gibi aynı mesajın birden çok consumer’a yayınlanması ihtiyacı için pub/sub pattern kullanılabilir. Böylece producer’ın consumer’lara herhangi bir bağlılığı olmadan(decoupling) mesaj iletimini sağlayabilir, yeni consumer’ların sisteme kolayca eklenip çıkarılması esnekliğine sahip olabiliriz.

Kod örneği ile devam edelim. Önceki örneklerdeki basit yapımızı tekrar kurup Publisher ve Consumer projeleri oluşturalım. Publisher projesindeki Program.cs içeriği aşağıdaki gibi olsun.

Publisher sadece “ex-pubsub” isminde bir Fanout Exchange oluşturup her saniye bir mesaj gönderiyor. Consumer projesindeki Program.cs kodları aşağıdaki gibi olsun.

Burada ise consumer adını argümandan aldık ve “ex-pubsub” exchange’ine bağlanıp consumer adıyla bir kuyruk oluşturduk. Çalıştırıp deneyelim. iki consumer açalım ve isimleri A ve B olsun.(dotnet run Ave dotnet run Bile).

Görüldüğü gibi producer tarafından exchange’e gönderilen mesajlar iki consumer’a da dağıtılmakta. Örnekteki iki consumer yerine daha fazla consumer açarak deneyebilirsiniz. Kod satırında bıraktığım açıklamaya dikkat edilirse consumer’ların kapanması durumunda kuyrukların otomatik silinmesi istenilirse queueName = channel.QueueDeclare().QueueNameile tanım yapılabilir. Bu şekilde kuyruk tanımlandığında consumer ile kuyruk bağlantısı koptuğunda kuyruk kalıcı olmayacak ve silinecektir.

Request Reply Pattern

Diğer adıyla Request/Response Pattern olan bu tasarım kalıbında bir servis başka bir servise mesaj gönderir ve yanıtını aldıktan sonra işleme devam eder. Bu tasarım kalıbının RabbitMQ ile uygulanışında producer/consumer ayrımını yapmayacağız bunun yerine client/server olarak ayıracağız. Mesajı gönderen aynı zamanda sonucunu da bekleyeceğinden producer aynı zamanda bir consumer olmak durumunda veya tersi consumer aynı zamanda bir producer olacak. Şekil üzerinden daha açık görülebilir.

Request-Reply, Request-Response, RPC.

Bir client yani istemci, sunucudan yanıtını beklediği bir mesaj gönderir ve mesaj kuyruğa eklenir. Dinleme yapan sunucu mesajı kuyruktan alır ve sonucunu mesaj sonuçlarının olduğu başka bir kuyruğa ekler. İstemci sonucu kuyruktan alır ve süreç tamamlanır.

Server gelen bir mesajı işledikten sonra sonucunu hani kuyruğa atacağına nasıl karar veriyor? Client ise kuyruğa gelen mesajlar arasında gönderdiği mesajın sonucunu nasıl ilişkilendiriyor? Gelen mesaj hangi mesajın sonucu bunu nasıl bulabiliriz? Sorularına cevap vermemiz gerekir. Mesajlaşmayı başlatan yani örneğimizdeki client, gönderdiği mesaj içerisine tanımlayıcı bir değer koyar buna CorrelationId denir. CorrelationId değeri server tarafından alınır ve sonuç olarak üretilen mesaj içerisine de eklenir bu şekilde hangi mesajın sonucu olduğu ayrılabilir. Client tarafından gönderilen mesaj içerisine ayrıca bu mesajın sonucunun hangi kuyruğa atılacağı bilgisi de eklenir. Bu da ReplyTo ismiyle verilir.

Kod örneğiyle devam edelim. Server ve Client isminde iki projemiz olsun. Server projesindeki Program.cs kodları aşağıdaki gibi olsun.

Server, “request_queue” isimli kuyruğu dinler. Mesaj geldiğinde CorrelationId ve ReplyTo parametrelerini okur. ReplyTo ile verilen kuyruğa “This is reply message” mesajını gönderir. Kod açıklamalarından detay takip edilebilir.

Client projesindeki Program.cs kodları:

Client, “request_queue” kuyruğuna “**hi from client**” mesajını gönderir. Bu mesaj parametrelerine CorrelationId ve ReplyTo değerlerini ekler. ReplyTo parametresinde verdiği isimle açtığı kuyruğu dinlemeye alır. Server tarafından ReplyTo kuyruğuna bir mesaj gönderildiğinde bu mesajı alır ve ekrana yazdırır.

Yukarıdaki Client ve Server kodlarını ayrı terminal ekranlarında çalıştırdığımızda aşağıdaki gibi çıktı üretecektir.

RabbitMQ, kullanım alanlarının çeşitliliği, ihtiyaçlara yönelik üretilen çözümlerin ve teknik detayların fazla olması nedeniyle geniş bir konu. Bu yazıdaki amacım genel bir bakışı vermek ve teknik detaya boğmadan anlaşılabilir olmaktı. Umarım verdiğim bilgiler faydalı olur.

Okuduğunuz için teşekkürler.

Engin Ünal

--

--