High-Performance, Stream-Based Communication Between Services with NET 5 and gRPC

Due to the needs of today’s technology age, we are trying to develop our applications using microservice architecture that met the scale demands. Also about the communication between distributed services, we are performing it with REST (HTTP JSON) approach at many points.

In this article, I will try to mention how we can perform high-performance stream-based communication between services using gRPC and what benefits we can get by using it.

Why?

As we know, sometimes there are scenarios where milliseconds are matter. In such cases, we usually try to do some code refactoring or change implementation methods in order to increase performance (I’m sure you have also encountered it as a developer). At this point, gRPC is a great implementation/communication approach that we can use to increase performance in most cases.

Especially if the size of the payload during inter-service communication is large, or if we want to take advantage of approaches such as streaming, gRPC is a great communication choice at this point. Because gRPC performs binary serialization and transfers binary data using the protocol buffer(Protobuf) over HTTP/2 protocol. So it performs a very high performance in inter-service communication.

Also with .NET 5, gRPC has come to a better point with optimizations such as concurrency improvements, reducing the allocations in HTTP/2 and Kestrel.

As we know in the REST approach, since we transfer data between services as JSON or XML, inter-service communication might be slower in some scenarios due to various reasons such as serialization time and the larger payload size compared to binary data.

For example

Let’s say we are working for an e-commerce company and developing a supplier portal. Also assume we have been requested to implement a feature where suppliers will be able to import their products in our system in a bulk in CSV/XML format.

Normally we develop an endpoint where suppliers can upload their products in bulk. In this endpoint, after parsing the CSV/XML file we send requests to the relevant internal APIs/Queues in order to initiate some product import processes. Well, the real challenge starts at this point.

While sending requests to the relevant internal APIs, we usually try to send them either in chunks or one by one to be more optimal. Well, we can perform these operations over an HTTP JSON API without any limitations. But unfortunately, it doesn’t mean that we would not have any disadvantages. For example, since the size of the API payloads might be largely due to the product information, at this point we may encounter some performance problems mostly serialization or network related. Also if there is a lot of data that needs to be sent, this situation returns us as a waste of time.

At this point, it is possible to make this internal communication much more effective, efficient and asynchronous by taking advantage of the features of gRPC such as binary serialization, HTTP/2 and streaming.

Client Streaming with gRPC

Client streaming is one of the 4 different approaches of gRPC. Others are “Unary”, “Server streaming” and “Bidirectional streaming”.

Client streaming is a very useful and efficient approach especially in cases where a continuous series of data needs to be sent to a server from a client. Also, the streaming approach is a good choice when it comes to a high throughout with low latency.

In our sample supplier portal scenario, we will implement the client streaming approach since we will stream multiple products from one point to another.

So, let’s write some codes.

Before starting let’s assume we have a CSV file like the one below.

Let’s start with server-side

First, we will start with the server-side. It will be our main product service that we will manage products. So, let’s create a .NET 5 gRPC service called “MyTodoStore.Product.GRPC“.

dotnet new grpc -n MyTodoStore.Product.GRPC

Since gRPC is a contract-first framework, first we need to define contracts by using Protobuf.

We can think that the Protobuf is a contract/interface that we can use in inter-service communication without having any dependency on a specific programming language. Also, the contracts let relevant frameworks create the required communication infrastructure between services.

Now let’s create “product.proto” file under the “Protos” folder as following.

syntax = "proto3";

option csharp_namespace = "MyTodoStore.Product.GRPC";

package mytodostore_product_grpc;

service ProductGRPCService {
  rpc ImportProductsStream(stream ImportProductRequest) returns (ImportProductResponse);
}

message ImportProductRequest {
  int32 supplier_id = 1;
  string sku = 2;
  string name = 3;
  string description = 4;
  string brand = 5;
}

message ImportProductResponse {
  int32 count = 1;
}

The important point is here the “service” section where we define RPC methods. In this section, we have defined an RPC method called “ImportProductsStream” which will be responsible for importing products into the system as a stream. This method will accept an “ImportProductRequest” message as a stream and when the process is completed, it will return an “ImportProductResponse” message which will contain the total imported product count.

The unique numbers assigned to the fields in the message structures are used to identify the fields in the serialized binary data. Thus, when a new field is added to the message, the parser of the application, that does not have that update yet, can skip the related field without causing any problems during the deserialization process and does not cause any incompatibility problems.

NOTE: You can find more detailed information about proto files here.

After creating the “product.proto” file, we need to define it also in the project file in order for the code generation to take place.

Thus, when we build the “MyTodoStore.Product.GRPC” project, the compiler will perform the necessary code generation operation by using the proto file that we have created. So, let’s not forget to build the project once we define the proto file.

Now we need to create a class called “ProductService” under the “Services” folder and implement it as follows.

using System;
using System.Threading.Tasks;
using Grpc.Core;

namespace MyTodoStore.Product.GRPC
{
    public class ProductService : ProductGRPCService.ProductGRPCServiceBase
    {
        public override async Task<ImportProductResponse> ImportProductsStream(IAsyncStreamReader<ImportProductRequest> requestStream, ServerCallContext context)
        {
            var importResponse = new ImportProductResponse();

            await foreach (var importProductItem in requestStream.ReadAllAsync())
            {
                // product import operations...

                importResponse.Count += 1;
                Console.WriteLine($"1 product has been imported. SKU: {importProductItem.Sku} Brand: {importProductItem.Brand}");
            }

            Console.WriteLine("Import products stream has been ended.");

            return importResponse;
        }
    }
}

As we can see, we have derived this class from the “ProductGRPCServiceBase” abstract class which has been generated by the compiler. Basically in the “ImportProductsStream” method, we have implemented a simple logic that performs the consume operation of the stream.

Our aim in this simple logic is to show that we can start import product processes asynchronously while streaming products to the server, in other words, the server can start working asynchronously without waiting for the entire product stream to be completed. For this, we will increase the “count” property of the “importResponse” in the “foreach” loop and return it back to the client.

After completing the service implementation, we need to perform an endpoint mapping operation in the “Startup” class as follows.

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseRouting();

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapGrpcService<ProductService>();

        endpoints.MapGet("/", async context =>
        {
            await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
        });
    });
}

So, we are almost ready.

If you are also using macOS just like me, you need to configure the Kestrel as follows. Because Kestrel doesn’t support HTTP/2 with TLS in macOS.

using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Hosting;

namespace MyTodoStore.Product.GRPC
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.ConfigureKestrel(options =>
                    {
                        options.ListenLocalhost(5000, o => o.Protocols =
                            HttpProtocols.Http2);
                    });

                    webBuilder.UseStartup<Startup>();
                });
    }
}

Now the “MyTodoStore.Product.GRPC” is ready.

Implementation of the Client

Now let’s develop a RESTful API that we will use for supplier product operations. Basically in this API, we will implement the upload operation of the CSV file.

For this, let’s create a .NET 5 Web API project called “MyTodoStore.SupplierProduct.API“.

dotnet new webapi -n MyTodoStore.SupplierProduct.API

Then, let’s include the following packages in the project via NuGet. We will use those packages for the CSV file operations and the gRPC client implementations.

dotnet add package CsvHelper
dotnet add package Grpc.Net.Client
dotnet add package Grpc.Net.ClientFactory

After including the packages in the project, now we need to copy the “product.proto” file of the “MyTodoStore.Product.GRPC” service under a new folder called “Protos” in this project. Then, let’s update the “csharp_namespace” field in the newly copied proto file with this project namespace.

option csharp_namespace = "MyTodoStore.SupplierProduct.API";

After that, we need to define the “product.proto” file in this project file as well.

While defining the proto file in this project, the only difference is that the value of the “GrpcServices” attribute is “Client“. Thus, the compiler will generate the necessary C# client code for us to connect the server side. After this operation, let’s not forget to build this project as we did before.

Now, let’s create a folder called “Models” and define the model that will represent the CSV file as follows.

namespace MyTodoStore.SupplierProduct.API.Models
{
    public class SupplierProductModel
    {
        public int SupplierID { get; set; }
        public string SKU { get; set; }
        public string Name { get; set; }
        public string Description { get; set; }
        public string Brand { get; set; }
    }
}

Then let’s create the “Services” folder where we will implement the service. After that, we can create and implement a service in this folder called “SupplierProductService“.

using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;

namespace MyTodoStore.SupplierProduct.API.Services
{
    public interface ISupplierProductService
    {
        Task<int> ImportProductsAsync(IFormFile formFile);
    }
}
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Threading.Tasks;
using CsvHelper;
using CsvHelper.Configuration;
using Microsoft.AspNetCore.Http;
using MyTodoStore.SupplierProduct.API.Models;

namespace MyTodoStore.SupplierProduct.API.Services
{
    public class SupplierProductService : ISupplierProductService
    {
        private readonly ProductGRPCService.ProductGRPCServiceClient _productGRPCServiceClient;

        public SupplierProductService(ProductGRPCService.ProductGRPCServiceClient productGRPCServiceClient)
        {
            _productGRPCServiceClient = productGRPCServiceClient;
        }

        public async Task<int> ImportProductsAsync(IFormFile formFile)
        {
            var config = new CsvConfiguration(CultureInfo.InvariantCulture)
            {
                Delimiter = ";"
            };

            using var importProductStream = _productGRPCServiceClient.ImportProductsStream();

            using (var reader = new StreamReader(formFile.OpenReadStream()))
            using (var csv = new CsvReader(reader, config))
            {
                IAsyncEnumerable<SupplierProductModel> products = csv.GetRecordsAsync<SupplierProductModel>();

                await foreach (SupplierProductModel product in products)
                {
                    ImportProductRequest importProductRequest = new()
                    {
                        SupplierId = product.SupplierID,
                        Sku = product.SKU,
                        Name = product.Name,
                        Description = product.Description,
                        Brand = product.Brand
                    };

                    await importProductStream.RequestStream.WriteAsync(importProductRequest);
                }
            }
            await importProductStream.RequestStream.CompleteAsync();

            ImportProductResponse response = await importProductStream;

            return response.Count;
        }
    }
}

Here we simply have injected the “ProductGRPCServiceClient” client which has been created by the compiler to connect the “MyTodoStore.Product.GRPC” service.

Then we have performed the parsing operation of the CSV file by using the CsvHelper package and we have sent each product to the server side by streaming them one by one over the “ProductGRPCServiceClient” client.

In order to notify the server that the stream is completed, we have called the “CompleteAsync” method of the stream as follows.

await importProductStream.RequestStream.CompleteAsync();

Thus, the streaming process will be completed and the server will close the relevant channel and return back the information about how many products have been imported into the system.

Now let’s create a controller called “Product” and implement the following endpoint.

using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MyTodoStore.SupplierProduct.API.Services;

namespace MyTodoStore.SupplierProduct.API.Controllers
{
    [ApiController]
    [Route("products")]
    public class ProductController : ControllerBase
    {
        private readonly ISupplierProductService _supplierProductService;

        public ProductController(ISupplierProductService supplierProductService)
        {
            _supplierProductService = supplierProductService;
        }

        [HttpPost]
        public async Task<IActionResult> UploadProducts(IFormFile formFile)
        {
            int importedProductCount = await _supplierProductService.ImportProductsAsync(formFile);

            return Ok($"{importedProductCount} products have been imported.");
        }
    }
}

After that, we need to perform the service injection operation and the gRPC client registration operation in the “Startup” class. For the registration of the gRPC client, we will use the “AddGrpcClient” method that comes with the “Grpc.Net.ClientFactory” package.

By using this package, we can configure gRPC clients from a central point. In addition, this package automatically ensures that channels can be reused in order for us to have high-performance communication. Because the creation of a new channel means a network round-trip on both client and server sides.

public void ConfigureServices(IServiceCollection services)
{

    services.AddControllers();
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyTodoStore.SupplierProduct.API", Version = "v1" });
    });

    services.AddScoped<ISupplierProductService, SupplierProductService>();

    services.AddGrpcClient(o =>
    {
        o.Address = new Uri("http://localhost:5000");
    });
}

Lastly, we need to serve the API on a different port. Because the default “5000” port has been allocated by the gRPC service we have created. Let’s serve this API over the “5001” port as follows.

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace MyTodoStore.SupplierProduct.API
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseUrls("https://*:5001");
                    webBuilder.UseStartup<Startup>();
                });
    }
}

Now we are ready to test.

Let’s test it

First, we need to run both projects as follows.

Then let’s access the swagger interface of the “MyTodoStore.SupplierProduct.API” project over the “https://localhost:5001/swagger/index.html” endpoint and upload the CSV file through the “/products” endpoint.

Our expectation from this operation is that each product should be sent as a stream to the “MyTodoStore.Product.GRPC” service and then we should be able to get back a single response that contains information about how many products have been imported.

After the upload operation is completed, let’s take a look at the logs of the “MyTodoStore.Product.GRPC” service via the terminal.

If we look at the terminal screen above, we can see that log messages have been written during the import process of the products into the system.

1 product has been imported. SKU: ABC Brand: Samsung
1 product has been imported. SKU: CDE Brand: Samsung
1 product has been imported. SKU: GDE Brand: Apple
Import products stream has been ended.

Let’s recall the code block responsible for importing products into the system.

public override async Task<ImportProductResponse> ImportProductsStream(IAsyncStreamReader<ImportProductRequest> requestStream, ServerCallContext context)
{
    var importResponse = new ImportProductResponse();

    await foreach (var importProductItem in requestStream.ReadAllAsync())
    {
        // product import operations...

        importResponse.Count += 1;
        Console.WriteLine($"1 product has been imported. SKU: {importProductItem.Sku} Brand: {importProductItem.Brand}");
    }

    Console.WriteLine("Import products stream has been ended.");

    return importResponse;
}

As we can see, log messages get printed while consuming the “requestStream” in the “foreach” loop. When the streaming is ended by the client, the “Import products stream has been ended.” log message gets printed.

In short, we can see that the streaming process of the products is handled as async by the server while the client is still working on the products, and when the streaming process also is completed, an “ImportProductResponse” message is returned to the client.

Resiliency

Although it is important to provide high-performance communication between services, it is also important to be prepared for transient failures. In the gRPC world, we also should be prepared for network-related failures or situations where services are temporarily unavailable.

Fortunately, we can take advantage of the gRPC retries feature to develop fault-tolerant gRPC applications without developing any custom solutions.

In order to use this feature, we basically need to have the following logic while registering the gRPC client on the client side. Thus, we can globally have the retry feature for the relevant client.

var retryMethodConfig = new MethodConfig
{
    Names = { MethodName.Default },
    RetryPolicy = new RetryPolicy
    {
        MaxAttempts = 5,
        InitialBackoff = TimeSpan.FromSeconds(1),
        MaxBackoff = TimeSpan.FromSeconds(5),
        BackoffMultiplier = 1.5,
        RetryableStatusCodes = { StatusCode.Unavailable }
    }
};

services.AddGrpcClient<ProductGRPCService.ProductGRPCServiceClient>(o =>
{
    o.Address = new Uri("http://localhost:5000");
    o.ChannelOptionsActions.Add(opt => opt.ServiceConfig = new ServiceConfig { MethodConfigs = { retryMethodConfig } });
});

You can find more detailed information here.

Let’s sum up

gRPC is a great RPC framework designed by Google that allows us to have high performance communication between services. Contracts are defined as a proto file as we have defined in the “product.proto” file. Proto files are used for purposes such as having a gRPC infrastructure for both client and server and also for the communication between services.

gRPC is becoming a really great communication option especially after the improvements and optimizations made in .NET 5. By using gRPC for inter microservice communication, it is possible to make the communication process more effective, performance and asynchronous.

In addition, gRPC supports features such as “authentication”, “load balancing” and “health checking”.

Source: GokGokalp/mytodostore-net5-grpc-client-streaming: mytodostore-net5-grpc-client-streaming (github.com)

References

Core concepts, architecture and lifecycle | gRPC
Introduction to gRPC on .NET | Microsoft Docs

Gökhan Gökalp

View Comments

Recent Posts

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

6 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

8 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 years ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago

Building Microservices by Using Dapr and .NET with Minimum Effort – 02 (Azure Container Apps)

{:tr}Bir önceki makale serisinde Dapr projesinden ve faydalarından bahsedip, local ortamda self-hosted mode olarak .NET…

2 years ago