.net core grpc consul 实现服务注册 服务发现 负载均衡(二)
在上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1)中,我们实现的grpc通信在.net core中的可行性,但要在微服务中真正使用,还缺少 服务注册,服务发现及负载均衡等,本篇我们将在 .net core grpc 通信 的基础上加上 服务注册,服务发现,负载均衡。
如对.net core grpc 通信不太熟悉的,可以看上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1),然后再看本篇。
grpc([https://grpc.io/](https://grpc.io/))是google发布的一个开源、高性能、通用RPC(Remote Procedure Call)框架,使用HTTP/2协议,支持多路复用,并用ProtoBuf作为序列化工具,提供跨语言、跨平台支持。
Consul([https://www.consul.io](https://www.consul.io))是一个分布式,高可用、支持多数据中心的服务注册、发现、健康检查和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发。
本次服务注册、发现 通过 Consul Api 来实现,开发过程中结合.net core 依赖注入,切面管道思想等。
软件版本
.net core:2.0
grpc:1.11.0
Consul:1.1.0
Consul Nuget注册组件:0.7.2.5
#### 项目结构 ####
.net core 代码部分:
Snai.GrpcClient 客户端 .net core 2.0控制台程序
Snai.GrpcService.Hosting 服务端宿主,Api服务注册,asp.net core 2.0网站程序
Snai.GrpcService.Impl 协议方法实现 .net standard 2.0类库
Snai.GrpcService.Protocol 生成协议方法 .net standard 2.0类库
![Image](/sitedata/image/dotnet_2_1.png)
Consul:
conf 配置目录,本次用api注册服务,可以删除
data 缓存数据目录,可清空里面内容
dist Consul UI目录,本次用默认的UI,可以删除
consul.exe 注册软件
startup.bat 执行脚本
![Image](/sitedata/image/dotnet_2_2.png)
#### 项目实现 ####
**一、服务端**
服务端主要包括Grpc服务端,Consul Api服务注册、健康检查等。
新建Snai.GrpcService解决方案,由于这次加入了 Consul Api 服务注册,所以我们先从 Api 服务注册开始。
1、实现 Consul Api 服务注册
新建 Snai.GrpcService.Hosting 基于Asp.net Core 2.0空网站,在 依赖项 右击 管理NuGet程序包 浏览 找到 Consul 版本0.7.2.5安装,用于Api服务注册使用
新建 appsettings.json 配置文件,配置 GrpcService Grpc服务端IP和端口,HealthService健康检测名称、IP和地址,ConsulService Consul的IP和端口,代码如下
```
{
"GrpcService": {
"IP": "localhost",
"Port": "5031"
},
"HealthService": {
"Name": "GrpcService",
"IP": "localhost",
"Port": "5021"
},
"ConsulService": {
"IP": "localhost",
"Port": "8500"
}
}
```
新建Consul目录,用于放Api注册相关代码
在Consul目录下新建Entity目录,在Entity目录下新建HealthService.cs,ConsulService.cs类,分别对应HealthService,ConsulService两个配置项,代码如下
HealthService.cs
```
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Snai.GrpcService.Hosting.Consul.Entity
{
public class HealthService
{
public string Name { get; set; }
public string IP { get; set; }
public int Port { get; set; }
}
}
```
ConsulService.cs
```
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Snai.GrpcService.Hosting.Consul.Entity
{
public class ConsulService
{
public string IP { get; set; }
public int Port { get; set; }
}
}
```
在 Consul 目录下新建 AppRregister.cs 类,添加 IApplicationBuilder 扩展方法 RegisterConsul,来调用 Consul Api 实现服务注册,代码如下
```
using Consul;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Hosting.Consul.Entity;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Snai.GrpcService.Hosting.Consul
{
public static class AppRregister
{
// 服务注册
public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService)
{
var consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulService.Value.IP}:{consulService.Value.Port}"));//请求注册的 Consul 地址
var httpCheck = new AgentServiceCheck()
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册
Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔,或者称为心跳间隔
HTTP = $"http://{healthService.Value.IP}:{healthService.Value.Port}/health",//健康检查地址
Timeout = TimeSpan.FromSeconds(5)
};
// Register service with consul
var registration = new AgentServiceRegistration()
{
Checks = new[] { httpCheck },
ID = healthService.Value.Name + "_" + healthService.Value.Port,
Name = healthService.Value.Name,
Address = healthService.Value.IP,
Port = healthService.Value.Port,
Tags = new[] { $"urlprefix-/{healthService.Value.Name}" }//添加 urlprefix-/servicename 格式的 tag 标签,以便 Fabio 识别
};
consulClient.Agent.ServiceRegister(registration).Wait();//服务启动时注册,内部实现其实就是使用 Consul API 进行注册(HttpClient发起)
lifetime.ApplicationStopping.Register(() =>
{
consulClient.Agent.ServiceDeregister(registration.ID).Wait();//服务停止时取消注册
});
return app;
}
}
}
```
修改 Startup.cs 代码
加入 Startup(IConfiguration configuration) 构造函数,实现配置注入,如果建的是Web Api或MVC网站,默认是有的
修改 ConfigureServices(IServiceCollection services) 方法,注册全局配置
修改 Configure() 方法,添加健康检查路由地址 app.Map("/health", HealthMap),调用 RegisterConsul 扩展方法实现服务注册
添加 HealthMap(IApplicationBuilder app) 实现health路由。由于只有一个健康检查地址,所以没有建Web Api网站,只建了个空网站
代码如下,注册配置GrpcService 、 注册Rpc服务、启动Rpc服务 后面用到等下讲
```
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Hosting.Consul;
using Snai.GrpcService.Hosting.Consul.Entity;
using Snai.GrpcService.Impl;
namespace Snai.GrpcService.Hosting
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
//注册全局配置
services.AddOptions();
services.Configure<Impl.Entity.GrpcService>(Configuration.GetSection(nameof(Impl.Entity.GrpcService)));
services.Configure<HealthService>(Configuration.GetSection(nameof(HealthService)));
services.Configure<ConsulService>(Configuration.GetSection(nameof(ConsulService)));
//注册Rpc服务
services.AddSingleton<IRpcConfig, RpcConfig>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService, IRpcConfig rpc)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
// 添加健康检查路由地址
app.Map("/health", HealthMap);
// 服务注册
app.RegisterConsul(lifetime, healthService, consulService);
// 启动Rpc服务
rpc.Start();
}
private static void HealthMap(IApplicationBuilder app)
{
app.Run(async context =>
{
await context.Response.WriteAsync("OK");
});
}
}
}
```
修改 Program.cs 代码,调置网站地址为 .UseUrls("http://localhost:5021"),代码如下
```
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Snai.GrpcService.Hosting
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseUrls("http://localhost:5021")
.UseStartup<Startup>()
.Build();
}
}
```
到此 Consul Api 服务注册 已完成,最终项目结构如下:
![Image](/sitedata/image/dotnet_2_3.png)
2、协议编写,将协议生成C#代码
由于在上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1)有过介绍,这里就简单说下
新建 Snai.GrpcService.Protocol协议类库项目,在 依赖项 右击 管理NuGet程序包 浏览 找到 Grpc.Core 版本1.11.0,Google.Protobuf 版本3.5.1 包下载安装
在根目录下新建msg.proto 文件,编写基于proto3语言的协议代码,用于生成各语言协议,msg.proto 代码如下
```
syntax = "proto3";
package Snai.GrpcService.Protocol;
service MsgService{
rpc GetSum(GetMsgNumRequest) returns (GetMsgSumReply){}
}
message GetMsgNumRequest {
int32 Num1 = 1;
int32 Num2 = 2;
}
message GetMsgSumReply {
int32 Sum = 1;
}
```
新建.net framework 项目类库,引用安装 Grpc.Tools、Google.Protobuf.Tools 组件程序包,分别得到 grpc_csharp_plugin.exe、protoc.exe 工具
到package目录下,找到与系统相应的grpc_csharp_plugin.exe、protoc.exe工具,拷到 Snai.GrpcService.Protocol 项目下
在Snai.GrpcService.Protocol根目录下新建 ProtocGenerate.cmd 文件,在其中输入以下指令
```
protoc -I . --csharp_out . --grpc_out . --plugin=protoc-gen-grpc=grpc_csharp_plugin.exe msg.proto
```
然后直接双击运行,项目下生成了“Msg.cs”和“MsgGrpc.cs”两个文件,这样协议部分的所有工作就完成了,最终项目结构如下:
![Image](/sitedata/image/dotnet_2_4.png)
3、编写协议实现代码
新建 Snai.GrpcService.Impl 实现类库项目,在 依赖项 下载安装Grpc.Core 包,项目引用 Snai.GrpcService.Protocol
新建 Entity 目录,在Entity目录下新建 GrpcService.cs 类,对应 Snai.GrpcService.Hosting 项目下 appsettings.json 配置文件的 GrpcService 配置项,代码如下
```
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Snai.GrpcService.Impl.Entity
{
public class GrpcService
{
public string IP { get; set; }
public int Port { get; set; }
}
}
```
在根目录下新建 RpcService 目录,在 RpcService 目录下新建 MsgServiceImpl.cs 类,继承 MsgService.MsgServiceBase 协议类,实现服务方法,代码如下
```
using Grpc.Core;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Snai.GrpcService.Impl.RpcService
{
public class MsgServiceImpl : MsgService.MsgServiceBase
{
public override async Task<GetMsgSumReply> GetSum(GetMsgNumRequest request, ServerCallContext context)
{
var result = new GetMsgSumReply();
result.Sum = request.Num1 + request.Num2;
Console.WriteLine(request.Num1 + "+" + request.Num2 + "=" + result.Sum);
return result;
}
}
}
```
在根目录下新建IRpcConfig.cs接口,定义 Start() 用于Rpc启动基方法,代码如下
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcService.Impl
{
public interface IRpcConfig
{
void Start();
}
}
```
在根目录下新建 RpcConfig.cs 类,用于实现 IRpcConfig.cs 接口,启动Rpc服务,代码如下
```
using Grpc.Core;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Impl.RpcService;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcService.Impl
{
public class RpcConfig: IRpcConfig
{
private static Server _server;
static IOptions<Entity.GrpcService> GrpcSettings;
public RpcConfig(IOptions<Entity.GrpcService> grpcSettings)
{
GrpcSettings = grpcSettings;
}
public void Start()
{
_server = new Server
{
Services = { MsgService.BindService(new MsgServiceImpl()) },
Ports = { new ServerPort(GrpcSettings.Value.IP, GrpcSettings.Value.Port, ServerCredentials.Insecure) }
};
_server.Start();
Console.WriteLine($"Grpc ServerListening On Port {GrpcSettings.Value.Port}");
}
}
}
```
在回到Snai.GrpcService.Hosting项目中,在 Startup.cs 中 ConfigureServices 中注册 GrpcService 配置、注册Rpc服务,在 Configure 中 启动Rpc服务 就是上面说到的,如图
![Image](/sitedata/image/dotnet_2_5.png)
最终项目结构如下:
![Image](/sitedata/image/dotnet_2_6.png)
到此服务端的代码实现已完成,下面我们启动Consul和服务端,验证 Api 注册和Grpc启动。
**二、Consul和服务端启动**
启动Consul,启动Grpc服务、注册服务到Consul
1、启动Consul
首先下载Consul:[https://www.consul.io/downloads.html](https://www.consul.io/downloads.html) ,本项目是windows下进行测试,得到consul.exe
![Image](/sitedata/image/dotnet_2_7.png)
由于本次用Api注册,用Consul默认自带UI,所以conf和dist可删除
清除Consul/data 内容,新建startup.bat文件,输入下面代码,双击启动Consul,本项目测试时一台机器,所以把 本机IP 改成 127.0.0.1
```
consul agent -server -datacenter=grpc-consul -bootstrap -data-dir ./data -ui -node=grpc-consul1 -bind 本机IP -client=0.0.0.0
```
再在Consul目录下启动另一个cmd命令行窗口,输入命令:consul operator raft list-peers 查看状态查看状态,结果如下
![Image](/sitedata/image/dotnet_2_8.png)
打开Consul UI:http://localhost:8500 查看情况
![Image](/sitedata/image/dotnet_2_9.png)
Consul 启动成功。
在 [.net core Ocelot Consul 实现API网关 服务注册 服务发现 负载均衡](http://www.snaill.net/post/5) 中后面 Consul 部分,有 Consul 集群搭建等其他介绍,可以去参考看下。
2、启动服务端,启动Grpc服务、注册服务到Consul
由于客户端要实现负载,所以把 Snai.GrpcService.Hosting 项目生成两次,启动两个一样的服务端,只是端口不同
服务5021 地址为5021: .UseUrls("http://localhost:5021"),GrpcService:5031,如下图
![Image](/sitedata/image/dotnet_2_10.png)
![Image](/sitedata/image/dotnet_2_11.png)
服务5022 修改地址为5022: .UseUrls("http://localhost:5022"),GrpcService:5032,如下图
![Image](/sitedata/image/dotnet_2_12.png)
![Image](/sitedata/image/dotnet_2_13.png)
启动 服务5021和服务5022两个服务端,如下面
![Image](/sitedata/image/dotnet_2_14.png)
![Image](/sitedata/image/dotnet_2_15.png)
![Image](/sitedata/image/dotnet_2_16.png)
看到 Grpc ServerListening On Port 5031,Grpc ServerListening On Port 5032 说明 Grpc 服务端启动成功
看到 Request starting HTTP/1.1 GET http://localhost:5021/health 说明 Consul 健康检查成功
打开Consul服务查看地址 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,两个GrpcService注册成功,健康检查状态正常
![Image](/sitedata/image/dotnet_2_17.png)
**三、客户端**
客户端主要包括Grpc客户端,Consul Api服务发现、负载均衡等。
新建Snai.GrpcClient 控制台程序,在 依赖项 下载安装Grpc.Core 包,项目引用Snai.GrpcService.Protocol,在依赖项下载安装下面工具组件包
用于读取 json配置:Microsoft.Extensions.Configuration,Microsoft.Extensions.Configuration.Json
用于依赖注入:Microsoft.Extensions.DependencyInjection
用于注入全局配置:Microsoft.Extensions.Options,Microsoft.Extensions.Options.ConfigurationExtensions
在项目根目录下新建 Utils 目录,在 Utils 目录下新建 HttpHelper.cs 类,用于程序内发送http请求,代码如下
```
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
/*
* 参考 pudefu https://www.cnblogs.com/pudefu/p/7581956.html ,在此表示感谢
*/
namespace Snai.GrpcClient.Utils
{
public class HttpHelper
{
/// <summary>
/// 同步GET请求
/// </summary>
/// <param name="url"></param>
/// <param name="headers"></param>
/// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
/// <returns></returns>
public static string HttpGet(string url, Dictionary<string, string> headers = null, int timeout = 0)
{
using (HttpClient client = new HttpClient())
{
if (headers != null)
{
foreach (KeyValuePair<string, string> header in headers)
{
client.DefaultRequestHeaders.Add(header.Key, header.Value);
}
}
if (timeout > 0)
{
client.Timeout = new TimeSpan(0, 0, timeout);
}
Byte[] resultBytes = client.GetByteArrayAsync(url).Result;
return Encoding.UTF8.GetString(resultBytes);
}
}
/// <summary>
/// 异步GET请求
/// </summary>
/// <param name="url"></param>
/// <param name="headers"></param>
/// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
/// <returns></returns>
public static async Task<string> HttpGetAsync(string url, Dictionary<string, string> headers = null, int timeout = 0)
{
using (HttpClient client = new HttpClient())
{
if (headers != null)
{
foreach (KeyValuePair<string, string> header in headers)
{
client.DefaultRequestHeaders.Add(header.Key, header.Value);
}
}
if (timeout > 0)
{
client.Timeout = new TimeSpan(0, 0, timeout);
}
Byte[] resultBytes = await client.GetByteArrayAsync(url);
return Encoding.Default.GetString(resultBytes);
}
}
/// <summary>
/// 同步POST请求
/// </summary>
/// <param name="url"></param>
/// <param name="postData"></param>
/// <param name="headers"></param>
/// <param name="contentType"></param>
/// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
/// <param name="encoding">默认UTF8</param>
/// <returns></returns>
public static string HttpPost(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null)
{
using (HttpClient client = new HttpClient())
{
if (headers != null)
{
foreach (KeyValuePair<string, string> header in headers)
{
client.DefaultRequestHeaders.Add(header.Key, header.Value);
}
}
if (timeout > 0)
{
client.Timeout = new TimeSpan(0, 0, timeout);
}
using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8))
{
if (contentType != null)
{
content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType);
}
using (HttpResponseMessage responseMessage = client.PostAsync(url, content).Result)
{
Byte[] resultBytes = responseMessage.Content.ReadAsByteArrayAsync().Result;
return Encoding.UTF8.GetString(resultBytes);
}
}
}
}
/// <summary>
/// 异步POST请求
/// </summary>
/// <param name="url"></param>
/// <param name="postData"></param>
/// <param name="headers"></param>
/// <param name="contentType"></param>
/// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
/// <param name="encoding">默认UTF8</param>
/// <returns></returns>
public static async Task<string> HttpPostAsync(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null)
{
using (HttpClient client = new HttpClient())
{
if (headers != null)
{
foreach (KeyValuePair<string, string> header in headers)
{
client.DefaultRequestHeaders.Add(header.Key, header.Value);
}
}
if (timeout > 0)
{
client.Timeout = new TimeSpan(0, 0, timeout);
}
using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8))
{
if (contentType != null)
{
content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType);
}
using (HttpResponseMessage responseMessage = await client.PostAsync(url, content))
{
Byte[] resultBytes = await responseMessage.Content.ReadAsByteArrayAsync();
return Encoding.UTF8.GetString(resultBytes);
}
}
}
}
}
}
```
在项目根目录下新建 Consul 目录,在 Consul 目录下新建 Entity 目录,在 Entity 目录下新建 HealthCheck.cs 类,用于接收 Consul Api发现的信息实体,代码如下
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.Consul.Entity
{
public class HealthCheck
{
public string Node { get; set; }
public string CheckID { get; set; }
public string Name { get; set; }
public string Status { get; set; }
public string Notes { get; set; }
public string Output { get; set; }
public string ServiceID { get; set; }
public string ServiceName { get; set; }
public string[] ServiceTags { get; set; }
public dynamic Definition { get; set; }
public int CreateIndex { get; set; }
public int ModifyIndex { get; set; }
}
}
```
在 Consul 目录下新建 IAppFind.cs 接口,定义 FindConsul() 用于 Consul 服务发现基方法,代码如下
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.Consul
{
public interface IAppFind
{
IEnumerable<string> FindConsul(string ServiceName);
}
}
```
在 Consul 目录下新建 AppFind.cs 类,用于实现 IAppFind.cs 接口,实现 Consul 服务发现方法,代码如下
```
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Snai.GrpcClient.Consul.Entity;
using Snai.GrpcClient.Framework.Entity;
using Snai.GrpcClient.Utils;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Snai.GrpcClient.Consul
{
/*
* 服务发现
* (服务和健康信息)http://localhost:8500/v1/health/service/GrpcService
* (健康信息)http://localhost:8500/v1/health/checks/GrpcService
*/
public class AppFind: IAppFind
{
static IOptions<GrpcServiceSettings> GrpcSettings;
static IOptions<ConsulService> ConsulSettings;
public AppFind(IOptions<GrpcServiceSettings> grpcSettings, IOptions<ConsulService> consulSettings)
{
GrpcSettings = grpcSettings;
ConsulSettings = consulSettings;
}
public IEnumerable<string> FindConsul(string ServiceName)
{
Dictionary<string, string> headers = new Dictionary<string, string>();
var consul = ConsulSettings.Value;
string findUrl = $"http://{consul.IP}:{consul.Port}/v1/health/checks/{ServiceName}";
string findResult = HttpHelper.HttpGet(findUrl, headers, 5);
if (findResult.Equals(""))
{
var grpcServices = GrpcSettings.Value.GrpcServices;
return grpcServices.Where(g=>g.ServiceName.Equals(ServiceName,StringComparison.CurrentCultureIgnoreCase)).Select(s => s.ServiceID);
}
var findCheck = JsonConvert.DeserializeObject<List<HealthCheck>>(findResult);
return findCheck.Where(g => g.Status.Equals("passing", StringComparison.CurrentCultureIgnoreCase)).Select(g => g.ServiceID);
}
}
}
```
在项目根目录下新建 LoadBalance 目录,在 LoadBalance 目录下新建 ILoadBalance.cs 接口,定义 GetGrpcService() 用于负载均衡基方法,代码如下
```
using Snai.GrpcClient.Framework.Entity;
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.LoadBalance
{
/*
* 负载均衡接口
*/
public interface ILoadBalance
{
string GetGrpcService(string ServiceName);
}
}
```
在 LoadBalance 目录下新建 WeightRoundBalance.cs 类,用于实现 ILoadBalance.cs 接口,实现 GetGrpcService() 负载均衡方法,本次负载均衡实现权重轮询算法,代码如下
```
using Snai.GrpcClient.Consul;
using Snai.GrpcClient.Utils;
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using Snai.GrpcClient.Framework.Entity;
using Microsoft.Extensions.Options;
namespace Snai.GrpcClient.LoadBalance
{
/*
* 权重轮询
*/
public class WeightRoundBalance : ILoadBalance
{
int Balance;
IOptions<GrpcServiceSettings> GrpcSettings;
IAppFind AppFind;
public WeightRoundBalance(IOptions<GrpcServiceSettings> grpcSettings, IAppFind appFind)
{
Balance = 0;
GrpcSettings = grpcSettings;
AppFind = appFind;
}
public string GetGrpcService(string ServiceName)
{
var grpcServices = GrpcSettings.Value.GrpcServices;
var healthServiceID = AppFind.FindConsul(ServiceName);
if (grpcServices == null || grpcServices.Count() == 0 || healthServiceID == null || healthServiceID.Count() == 0)
{
return "";
}
//健康的服务
var healthServices = new List<Framework.Entity.GrpcService>();
foreach (var service in grpcServices)
{
foreach (var health in healthServiceID)
{
if (service.ServiceID.Equals(health, StringComparison.CurrentCultureIgnoreCase))
{
healthServices.Add(service);
break;
}
}
}
if (healthServices == null || healthServices.Count() == 0)
{
return "";
}
//加权轮询
var services = new List<string>();
foreach (var service in healthServices)
{
services.AddRange(Enumerable.Repeat(service.IP + ":" + service.Port, service.Weight));
}
var servicesArray = services.ToArray();
Balance = Balance % servicesArray.Length;
var grpcUrl = servicesArray[Balance];
Balance = Balance + 1;
return grpcUrl;
}
}
}
```
在项目根目录下新建 RpcClient 目录,在 RpcClient 目录下新建 IMsgClient.cs 接口,定义 GetSum() 用于Grpc客户端调用基方法,代码如下
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.RpcClient
{
public interface IMsgClient
{
void GetSum(int num1, int num2);
}
}
```
在 RpcClient 目录下新建 MsgClient.cs 类,用于实现 IMsgClient.cs 接口,实现 GetSum() 方法用于Grpc客户端调用,代码如下
```
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.LoadBalance;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.RpcClient
{
public class MsgClient: IMsgClient
{
ILoadBalance LoadBalance;
Channel GrpcChannel;
MsgService.MsgServiceClient GrpcClient;
public MsgClient(ILoadBalance loadBalance)
{
LoadBalance = loadBalance;
var grpcUrl = LoadBalance.GetGrpcService("GrpcService");
if (!grpcUrl.Equals(""))
{
Console.WriteLine($"Grpc Service:{grpcUrl}");
GrpcChannel = new Channel(grpcUrl, ChannelCredentials.Insecure);
GrpcClient = new MsgService.MsgServiceClient(GrpcChannel);
}
}
public void GetSum(int num1, int num2)
{
if (GrpcClient != null)
{
GetMsgSumReply msgSum = GrpcClient.GetSum(new GetMsgNumRequest
{
Num1 = num1,
Num2 = num2
});
Console.WriteLine("Grpc Client Call GetSum():" + msgSum.Sum);
}
else
{
Console.WriteLine("所有负载都挂掉了!");
}
}
}
}
```
在项目根目录下新建 Framework 目录,在 Framework 目录下新建 Entity 目录,在 Entity 目录下新建 ConsulService.cs 和 GrpcServiceSettings.cs 类,分别对应配置appsettings.json的 ConsulService,GrpcServiceSettings 两个配置项,代码如下
ConsulService.cs
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.Framework.Entity
{
public class ConsulService
{
public string IP { get; set; }
public int Port { get; set; }
}
}
```
GrpcServiceSettings.cs
```
using System;
using System.Collections.Generic;
using System.Text;
namespace Snai.GrpcClient.Framework.Entity
{
public class GrpcServiceSettings
{
public List<GrpcService> GrpcServices { get; set; }
}
public class GrpcService
{
public string ServiceName { get; set; }
public string ServiceID { get; set; }
public string IP { get; set; }
public int Port { get; set; }
public int Weight { get; set; }
}
}
```
在 Framework 目录下新建 DependencyInitialize.cs 类,定义 AddImplement() 方法用于注册全局配置和类到容器,实现依赖注入,代码如下
```
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.Consul;
using Snai.GrpcClient.Framework.Entity;
using Snai.GrpcClient.LoadBalance;
using Snai.GrpcClient.RpcClient;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
namespace Snai.GrpcClient.Framework
{
/*
* IServiceCollection 依赖注入生命周期
* AddTransient 每次都是全新的
* AddScoped 在一个范围之内只有同一个实例(同一个线程,同一个浏览器请求只有一个实例)
* AddSingleton 单例
*/
public static class DependencyInitialize
{
/// <summary>
/// 注册对象
/// </summary>
/// <param name="services">The services.</param>
/*
* IAppFind AppFind;
* 构造函数注入使用 IAppFind appFind
* AppFind = appFind;
*/
public static void AddImplement(this IServiceCollection services)
{
//添加 json 文件路径
var builder = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json");
//创建配置根对象
var configurationRoot = builder.Build();
//注册全局配置
services.AddConfigImplement(configurationRoot);
//注册服务发现
services.AddScoped<IAppFind, AppFind>();
//注册负载均衡
if (configurationRoot["LoadBalancer"].Equals("WeightRound", StringComparison.CurrentCultureIgnoreCase))
{
services.AddSingleton<ILoadBalance, WeightRoundBalance>();
}
//注册Rpc客户端
services.AddTransient<IMsgClient, MsgClient>();
}
/// <summary>
/// 注册全局配置
/// </summary>
/// <param name="services">The services.</param>
/// <param name="configurationRoot">The configurationRoot.</param>
/*
* IOptions<GrpcServiceSettings> GrpcSettings;
* 构造函数注入使用 IOptions<GrpcServiceSettings> grpcSettings
* GrpcSettings = grpcSettings;
*/
public static void AddConfigImplement(this IServiceCollection services, IConfigurationRoot configurationRoot)
{
//注册配置对象
services.AddOptions();
services.Configure<GrpcServiceSettings>(configurationRoot.GetSection(nameof(GrpcServiceSettings)));
services.Configure<ConsulService>(configurationRoot.GetSection(nameof(ConsulService)));
}
}
}
```
在根目录下新建 appsettings.json 配置文件,配置 GrpcServiceSettings 的 GrpcServices 为服务端发布的两个服务5021和5022,LoadBalancer 负载均衡为 WeightRound 权重轮询(如实现其他负载方法可做相应配置,注册负载均衡时也做相应修改),ConsulService Consul的IP和端口,代码如下
```
{
"GrpcServiceSettings": {
"GrpcServices": [
{
"ServiceName": "GrpcService",
"ServiceID": "GrpcService_5021",
"IP": "localhost",
"Port": "5031",
"Weight": "2"
},
{
"ServiceName": "GrpcService",
"ServiceID": "GrpcService_5022",
"IP": "localhost",
"Port": "5032",
"Weight": "1"
}
]
},
"LoadBalancer": "WeightRound",
"ConsulService": {
"IP": "localhost",
"Port": "8500"
}
}
```
GrpcServices Grpc服务列表
ServiceName:服务名称,负载同一服务名称相同
ServiceID:服务ID,保持唯一
IP:服务IP
Port:端口
Weight:服务权重
修改 Program.cs 的 Main() 方法,调用 AddImplement(),注册全局配置和类到容器,注入使用 MsgClient 类的 GetSum() 方法,实现 Grpc 调用,代码如下
```
using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.Framework;
using Snai.GrpcClient.RpcClient;
using System;
namespace Snai.GrpcClient
{
class Program
{
static void Main(string[] args)
{
IServiceCollection service = new ServiceCollection();
//注册对象
service.AddImplement();
//注入使用对象
var provider = service.BuildServiceProvider();
string exeArg = string.Empty;
Console.WriteLine("Grpc调用!");
Console.WriteLine("-c\t调用Grpc服务;");
Console.WriteLine("-q\t退出服务;");
while (true)
{
exeArg = Console.ReadKey().KeyChar.ToString();
Console.WriteLine();
if (exeArg.ToLower().Equals("c", StringComparison.CurrentCultureIgnoreCase))
{
//调用服务
var rpcClient = provider.GetService<IMsgClient>();
rpcClient.GetSum(10, 2);
}
else if (exeArg.ToLower().Equals("q", StringComparison.CurrentCultureIgnoreCase))
{
break;
}
else
{
Console.WriteLine("参数异常!");
}
}
}
}
}
```
右击项目生成,最终项目结构如下:
![Image](/sitedata/image/dotnet_2_18.png)
到此客户端的代码实现已完成,下面运行测试 Grpc+Consul 服务注册、服务发现和负载均衡。
**四、运行测试 Grpc+Consul 服务注册、服务发现和负载均衡**
双击 startup.bat 启动 Consul,再启动服务5021和5022,启动成功打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看服务情况
![Image](/sitedata/image/dotnet_2_19.png)
![Image](/sitedata/image/dotnet_2_20.png)
![Image](/sitedata/image/dotnet_2_21.png)
启动 Snai.GrpcClient 客户端
![Image](/sitedata/image/dotnet_2_22.png)
输入 c 调用Grpc服务,调用3次,5031调用2次,5032调用1次,成功实现负载均衡
![Image](/sitedata/image/dotnet_2_23.png)
关掉服务5022,等10秒左右(因为设置健康检查时间间隔10秒),再输入 c 调用Grpc服务,只调用5031
![Image](/sitedata/image/dotnet_2_24.png)
打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,5022 状态失败,或消失
![Image](/sitedata/image/dotnet_2_25.png)
Grpc+Consul实现服务注册、服务发现、健康检查和负载均衡已完成
Github源码地址:[https://github.com/Liu-Alan/Grpc-Consul](https://github.com/Liu-Alan/Grpc-Consul)