İçeriğe geç

Orleans ile Loosely Coupled ve Scalable RESTful Service Oluşturma

Merhaba arkadaşlar.

Daha önceki Orleans’a Giriş makalem içerisinde, bu aralar Orleans ve Actor-based sistemler üzerinde durduğumdan bahsetmiştim. Bu makale konusu altında ise Orleans’ı middle-tier olarak kullanıp, loosely coupled ve scalable RESTful service’ler nasıl oluştururuz konusuna değinmeye çalışacağım.

Orleans’ın bize kazandırdığı pratikliğin yanı sıra, mimari boyutta da yeni bir yaklaşım getiriyor aslında. Data neredeyse business logic’in orada execute ediliyor olduğundan tutun, Reentrancy ve Concurrency gibi problemlerden de kaçınılmış bir şekilde her şeyin Scalable Grain‘ler (Virtual Actor) tarafından handle ediliyor olması kulağa ne kadar çok harika geliyor, değil mi?

Her neyse, actor-based sistemler her ne kadar fazlasıyla ilgimi çekiyor olsa da, Microsoft’un Orleans project’i ile neredeyse 10 yıllık yazılım hayatıma farklı bir bakış açısı geldi diyebilirim.

Orleans’ı Middle-Tier Olarak Kullanmak

Orleans’ın bize sunmuş olduğu gelişmiş actor modeli ile, distributed ve high-scale uygulamaları herhangi bir reliability, distributed resource management veya scalability bottleneck’leri gibi concern’ler ve complexity’ler olmadan geliştirebilmemize olanak sağlıyor. Peki, bu kadar güçlü bir framework’ü RESTful service’lerimizin arkasında bir middle-tier olarak kullanmak nasıl olurdu?

Dilerseniz konunun devamına bir örnek ile devam edelim. Kullanıcılar için bir araç takip sistemi geliştirdiğimizi düşünelim. İhtiyacımız olan araç verisini ise, sürücülerin aracı çalıştırdıkları an topluyor olacağız. Bu verilerin sonucunda ise örneğimiz gereği sisteme subscribe olan client’lara, aracın nerede olduğu bilgisini notification olarak göndereceğiz.

Uygulayacak olduğumuz mimari tasarım, neredeyse yukarıdaki diyagram gibi olacaktır. Bir adet REST endpoint’i ve arkasında çalışan bir Silo.

Siloyu Oluşturmak

İlk olarak “VehicleTracking.Common” isminde bir class library oluşturalım. Bu library içerisinde Grainler arası aktaracak olduğumuz message’ları tanımlayacağız.

“VehicleInfo” message’ını, aşağıdaki gibi tanımlayalım.

using System;
using Orleans.Concurrency;

namespace VehicleTracking.Common
{
    [Immutable]
    public class VehicleInfo
    {
        public long DeviceId { get; set; }
        public string Location { get; set; }
        public string Direction { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Yukarıda yer alan “[Immutable]” attribute’ünü, serialization işlemleri sırasında performans kazandırabilmek için kullandık. Orleans içerisinde farklı Silo’larda bulunan object’ler, Grain’ler arasında binary serializer ile serialize edilip, tekrar deserialize edilerek gönderilmektedir.

[Immutable] Messages

Serialization işlemi, object’lerin farklı Silo’larda bulunan Grain’lere ulaşabilmesi için gerçekleştirilir. Bir diğer yandan ise aynı Silo üzerinde bulunan Grain’lerin, aynı object’e erişmemeleri ve internal state’lerini değiştirememeleri için deep-copy işlemi gerçekleştirilir. Fakat bu işlem aynı Silo üzerinde bulunan Grain’ler için, biraz daha performanslı bir hale getirilebilinir. Performans optimizasyonu için Orleans içerisinde ise bu serialization işlemini, object’in immutable olup olmadığına karar verebilerek, bypass edebilmek mümkündür.

“VehicleTracking.GrainInterfaces” isminde bir class library daha oluşturalım ve içerisine “IVehicleGrain” isminde bir interface tanımlayalım.

using System.Threading.Tasks;
using Orleans;
using VehicleTracking.Common;

namespace VehicleTracking.GrainInterfaces
{
    public interface IVehicleGrain : IGrainWithIntegerKey
    {
        Task SetVehicleInfo(VehicleInfo info);
    }
}

Tanımlamış olduğumuz “IVehicleGrain” interface’i, sürücülerin konumlarını takip edebilmemiz için ihtiyaç duyduğumuz function’ı içermektedir. Function adına dikkat ederse eğer, daha çok bir RPC tanımlasına benzediğini görebiliriz. Çünkü Orleans client’ları ve Grain’leri, birbirleri ile RPC üzerinden haberleşmektedir.

“IVehicleTrackingGrain” isminde yeni bir interface daha tanımlayalım.

using System.Threading.Tasks;
using Orleans;
using VehicleTracking.Common;

namespace VehicleTracking.GrainInterfaces
{
    public interface  IVehicleTrackingGrain : IGrainWithIntegerKey
    {
        Task SetVehicleTrackingInfo(VehicleInfo info);
        Task Subscribe(IVehicleTrackingObserver observer);
        Task Unsubscribe(IVehicleTrackingObserver observer);
    }
}

“IVehicleTrackingGrain” interface’i ile aracın hareket ettiği durumlarda, “IVehicleGrain” üzerinden aracın bulunduğu konum bilgisini “SetVehicleTrackingInfo” method’u ile aktarıyor olacağız. Sonrasında ise sisteme subscribe olan client’lara, aracın hareket ettiği durumlarda tracking bilgisini notification olarak göndereceğiz.

Notification işlemini gerçekleştirebilmemiz için, bir observer tanımlayacağız. Bunun için “IVehicleTrackingObserver” isminde bir interface daha tanımlayalım.

using Orleans;
using VehicleTracking.Common;

namespace VehicleTracking.GrainInterfaces
{
    public interface IVehicleTrackingObserver : IGrainObserver
    {
        void ReportToVehicle(VehicleInfo info);
    }
}

Artık Grain implementasyonlarına başlayabiliriz.

Öncelikle observe işlemlerini gerçekleştireceğimiz, “IVehicleTrackingGrain” interface’ini implemente edelim.

using System;
using System.Threading.Tasks;
using Orleans;
using VehicleTracking.Common;
using VehicleTracking.GrainInterfaces;
using Orleans.Concurrency;

namespace VehicleTracking.Grains
{
    [Reentrant]
    public class VehicleTrackingGrain : Grain, IVehicleTrackingGrain
    {
        private ObserverSubscriptionManager<IVehicleTrackingObserver> _observers;
        private VehicleInfo _vehicleInfo;

        public override Task OnActivateAsync()
        {
            _observers = new ObserverSubscriptionManager<IVehicleTrackingObserver>();

            RegisterTimer(Callback, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));

            return base.OnActivateAsync();
        }

        Task Callback(object callbackState)
        {
            if (_vehicleInfo != null)
            {
                _observers.Notify(x => x.ReportToVehicle(_vehicleInfo));

                _vehicleInfo = null;
            }

            return TaskDone.Done;
        }

        public Task SetVehicleTrackingInfo(VehicleInfo info)
        {
            _vehicleInfo = info;

            return TaskDone.Done;
        }

        public Task Subscribe(IVehicleTrackingObserver observer)
        {
            _observers.Subscribe(observer);

            return TaskDone.Done;
        }

        public Task Unsubscribe(IVehicleTrackingObserver observer)
        {
            _observers.Unsubscribe(observer);

            return TaskDone.Done;
        }
    }
}

Observe işlemlerini Orleans içerisinde bulunan “ObserverSubscriptionManager” helper’ı ile gerçekleştireceğiz. Subscribing ve notification gönderme gibi işlemleri kolay bir şekilde handle etmektedir. Override ettiğimiz “OnActivateAsync” method’u ise, Grain’in aktive edilme işleminin en son kısmında call edilen bir method’dur. Burada ise “RegisterTimer” method’unu kullanarak, periodic olarak Grain’ler üzerinde callback işlemlerini gerçekleştirebilmeyi sağladık. Callback method’una baktığımızda ise “_vehicleInfo” field’ı null değilse, subscribe olmuş tüm client’lara “ReportToVehicle” method’u üzerinden bir notification göndereceğiz.

[Reentrant] Attribute

Yukarıda decorate etimiş olduğumuz “[Reentrant]” attribute’ünü de, network’de oluşabilecek bottleneck’lere karşı ve performans optimizasyonunu arttırabilmek için kullandık. Carl Hewitt‘in dediği gibi, kavramsal olarak actor model içerisinde message’lar birer birer işlenmektedir. Orleans içerisinde ise bazı maliyetli işlerin olduğu durumlarda Grain’i block’lamamak için ihtiyaç duyulabilecek noktalarda, “[Reentrant]” attribute’ü gibi tekniklerle concurrent processing sağlanabilmektedir. Fakat, kullanmamız gereken noktalarda dikkatli olmamız öneriliyor, aksi halde race-conditions durumları ile karşı karşıya gelebiliriz.

Artık “IVehicleGrain” interface’ini aşağıdaki gibi implemente edebiliriz.

using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using VehicleTracking.Common;
using VehicleTracking.GrainInterfaces;

namespace VehicleTracking.Grains
{
    [Reentrant]
    public class VehicleGrain : Grain, IVehicleGrain
    {
        private long _currentGrainId;

        public override Task OnActivateAsync()
        {
            _currentGrainId = this.GetPrimaryKeyLong();

            return base.OnActivateAsync();
        }

        public async Task SetVehicleInfo(VehicleInfo info)
        {
            //some business logics...

            var vehicleTrackingGrain = GrainFactory.GetGrain<IVehicleTrackingGrain>(_currentGrainId);

            await vehicleTrackingGrain.SetVehicleTrackingInfo(info);
        }
    }
}

Araçlardan gelecek olan konum bilgisini “SetVehicleInfo” method’u ile alıp, sonrasında bazı business logic’ler doğrultusunda işlediğimizi düşünelim. Business logic’lerin işlenmesinden sonra ise message’ı, örneğimiz gereği notification gönderebilmek için “VehicleTrackingGrain” e aktarıyoruz.

Artık implementasyonlarını tamamladık ve şimdi aşağıdaki gibi “VehicleTracking.TestSilo” isminde bir Orleans Dev/Test Host’u oluşturalım.

Sonrasında ise aşağıdaki gibi “VehicleTrackingObserver” isminde bir class tanımlayalım.

using System;
using VehicleTracking.Common;
using VehicleTracking.GrainInterfaces;

namespace VehicleTracking.TestSilo
{
    public class VehicleTrackingObserver : IVehicleTrackingObserver
    {
        public void ReportToVehicle(VehicleInfo info)
        {
            Console.WriteLine($"The vehicle id {info.DeviceId} moved to {info.Direction} from {info.Location} at {info.Timestamp.ToShortTimeString()} o'clock.");
        }
    }
}

Burada ise daha önce tanımlamış olduğumuz “IVehicleTrackingObserver” interface’ini implemente ettik. Aracın hareket haline geçmesi ile birlikte gelen notification’ları, console üzerine yazdıracağız.

“Program.cs” içerisini ise, aşağıdaki gibi değiştirelim.

using System;
using Orleans;
using Orleans.Runtime.Configuration;
using VehicleTracking.GrainInterfaces;

namespace VehicleTracking.TestSilo
{
    /// <summary>
    /// Orleans test silo host
    /// </summary>
    public class Program
    {
        static void Main(string[] args)
        {
            // The Orleans silo environment is initialized in its own app domain in order to more
            // closely emulate the distributed situation, when the client and the server cannot
            // pass data via shared memory.
            AppDomain hostDomain = AppDomain.CreateDomain("OrleansHost", null, new AppDomainSetup
            {
                AppDomainInitializer = InitSilo,
                AppDomainInitializerArguments = args,
            });

            var config = ClientConfiguration.LocalhostSilo();
            GrainClient.Initialize(config);

            // TODO: once the previous call returns, the silo is up and running.
            //       This is the place your custom logic, for example calling client logic
            //       or initializing an HTTP front end for accepting incoming requests.

            Console.WriteLine("Orleans Silo is running.\nPress Enter to terminate...");

            var vehicleTrackingObserver = new VehicleTrackingObserver();
            var vehicleTrackingObserverRef = GrainClient.GrainFactory
                                                    .CreateObjectReference<IVehicleTrackingObserver>(vehicleTrackingObserver).Result;

            var vehicleTrackingGrain = GrainClient.GrainFactory.GetGrain<IVehicleTrackingGrain>(1);
            vehicleTrackingGrain.Subscribe(vehicleTrackingObserverRef).Wait();

            hostDomain.DoCallBack(ShutdownSilo);

            Console.ReadLine();
        }

        static void InitSilo(string[] args)
        {
            hostWrapper = new OrleansHostWrapper(args);

            if (!hostWrapper.Run())
            {
                Console.Error.WriteLine("Failed to initialize Orleans silo");
            }
        }

        static void ShutdownSilo()
        {
            if (hostWrapper != null)
            {
                hostWrapper.Dispose();
                GC.SuppressFinalize(hostWrapper);
            }
        }

        private static OrleansHostWrapper hostWrapper;
    }
}

Burada ise “IVehicleTrackingGrain” tipinde bir Grain instance’ı alarak, “IVehicleTrackingObserver” üzerinden subscribe işlemini gerçekleştirdik. Bu proje üzerinde test amaçlı hem Orleans Silo’sunu ayağa kaldıracağız, hem de observer üzerinden gelen notification’ları da console üzerine yazdıracağız.

REST Endpoint’ini Tanımlamak

Artık REST endpoint’ini kodlamaya başlayabiliriz. Bunun için “VehicleTracking.Api” isminde empty bir Web API projesi oluşturalım ve ardından aşağıdaki gibi NuGet Package Manager üzerinden, “Microsoft.Orleans.Core” paketini dahil edelim.

Bu işlemin ardından, Silo ile iletişim kurabilmemiz için “Global.asax” içerisinde test Silo’sunu aşağıdaki gibi initialize etmemiz gerekiyor.

using Orleans;
using System.Web.Http;

namespace VehicleTracking.Api
{
    public class WebApiApplication : System.Web.HttpApplication
    {
        protected void Application_Start()
        {
            GlobalConfiguration.Configure(WebApiConfig.Register);

            var config = Orleans.Runtime.Configuration.ClientConfiguration.LocalhostSilo();
            GrainClient.Initialize(config);
        }
    }
}

Artık Silo ile iletişim kurabiliriz. Hemen “VehicleTracking” isminde bir controller oluşturalım ve aşağıdaki gibi kodlayalım.

using Orleans;
using System;
using System.Threading.Tasks;
using System.Web.Http;
using VehicleTracking.Common;
using VehicleTracking.GrainInterfaces;

namespace VehicleTracking.Api.Controllers
{
    public class VehicleTrackingController : ApiController
    {
        [Route("api/vehicle-trackings")]
        public async Task Post(long deviceId, string location, string direction)
        {
            var vehicleGrain = GrainClient.GrainFactory.GetGrain<IVehicleGrain>(deviceId);

            VehicleInfo trafficInfo = new VehicleInfo()
            {
                DeviceId = deviceId,
                Location = location,
                Direction = direction,
                Timestamp = DateTime.Now
            };

            await vehicleGrain.SetVehicleInfo(trafficInfo);
        }
    }
}

Artık POST işlemini gerçekleştirebileceğimiz bir endpoint’e sahibiniz. Burada “deviceId” ile bir “VehicleGrain” initialize edip, araç bilgilerini “SetVehicleInfo” method’u ile ilgili Grain’e aktarıyoruz.

Implementasyon işlemleri bu kadar ve artık test işlemi için hazırız. Test işlemini gerçekleştirebilmemiz için önce Silo’yu initialize etmemiz gerekiyor. Bunun için “VehicleTracking.TestSilo” projesini start etmemiz yeterli olacaktır ve sonrasında ise “VehicleTracking.Api” projesini start edeceğiz.

Bu işlemin ardından “VehicleTracking.Api” projesini de start edelim ve Postman üzerinden aşağıdaki gibi “/api/vehicle-trackings?deviceId=1&location=Taksim Square&direction=Bagdat Street” endpoint’ine, bir POST isteğinde bulunalım.

Sonuç olarak REST endpoint’i üzerinden göndermiş olduğumuz message’ın notification işleminin, yukarıda observer aracılığı ile console üzerine yazdırıldığını görebiliriz.

Sonuç Olarak

Oluşturmuş olduğumuz REST endpoint’i ve middle-tier olarak çalışan Orleans Silo’su ile, herhangi bir thread locking ve concurrency concern’leri olmadan loosely coupled ve scalable çalışan bir sistem inşa etmiş olduk.

Umarım faydalı bir blog yazısı olmuştur. Bu aralar Orleans’ın Docker Swarm ile beraber çalışması üzerine araştırmalarımı sürdürüyorum ve bu süreç içerisinde edinebildiğim veya karşılaşacağım tecrübeleri sizlerle başka bir blog konusu altında aktarmaya çalışacağım.

İlgili örneğe buradan erişebilirsiniz: https://github.com/GokGokalp/orleans-vehicletracking-sample

Kaynaklar:

https://dotnet.github.io/orleans/Tutorials/Front-Ends-for-Orleans-Services.html

https://dotnet.github.io/orleans/Tutorials/Concurrency.html

Kategori:.NETActor Programming ModelArchitectural

İlk Yorumu Siz Yapın

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

Bu site, istenmeyenleri azaltmak için Akismet kullanıyor. Yorum verilerinizin nasıl işlendiği hakkında daha fazla bilgi edinin.