gRPC 基本使用

标签:微服务首次发布:2023-11-14最近修改:2026-05-24

gRPC 基本示例

  1. 消息定义:
text
syntax = "proto3";option go_package = "./student;student";package student;message StudentRequest {  string id = 1;}message StudentResponse {  string name = 1;  int32 age = 2;}service StudentService {  rpc GetStudent(StudentRequest) returns (StudentResponse) {}}
  1. 使用 protoc 编译器进行编译。编译成功后,在 student 目录下面会多出来两个文件(student.pb.go 和 student_grpc.pb.go)。编译命令如下:
text
# 生成消息类型protoc --go_out=./ student.proto# 生成服务接口和服务注册函数protoc --go-grpc_out=./ student.proto
  1. 服务端代码:
go
type server struct {	pb.UnimplementedStudentServiceServer}// GetStudent 接口实现func (s *server) GetStudent(ctx context.Context, in *pb.StudentRequest) (*pb.StudentResponse, error) {	ctx, cancel := context.WithTimeout(context.Background(), time.Second)	defer cancel()	if in.Id == "1" {		return &pb.StudentResponse{Name: "张三", Age: 20}, nil	} else {		return &pb.StudentResponse{Name: "李四", Age: 21}, nil	}}func main() {	// 监听端口	lis, err := net.Listen("tcp", ":1234")	if err != nil {		log.Fatalf("failed to listen: %v", err)	}	// 创建一个新的gRPC服务器实例	s := grpc.NewServer()	// 注册学生服务到gRPC服务器	pb.RegisterStudentServiceServer(s, &server{})	// 监听gRPC请求	if err := s.Serve(lis); err != nil {		log.Fatalf("failed to serve: %v", err)	}}
  1. 客户端代码:
go
func main() {	// 与本地主机上的gRPC服务建立连接	conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())	if err != nil {		log.Fatalf("did not connect: %v", err)	}	defer func(conn *grpc.ClientConn) {		_ = conn.Close()	}(conn)	// 使用已经建立好的连接(conn)创建一个新的StudentServiceClient实例	client := pb.NewStudentServiceClient(conn)	// 超时控制	ctx, cancel := context.WithTimeout(context.Background(), time.Second)	defer cancel()	// 调用服务	r, err := client.GetStudent(ctx, &pb.StudentRequest{Id: "2"})	if err != nil {		log.Fatalf("could not get student: %v", err)	}	fmt.Printf("Name: %s Age: %d", r.GetName(), r.GetAge())}
  1. 运行结果:
text
Name: 李四 Age: 21

gRPC 安全传输

生成自签证书

  1. 安装 OpenSSL

    中文文档:OpenSSL 中文手册

    下载地址:Win32/Win64 OpenSSL Installer for Windows。下载到 d 盘解压、安装和配置环境变量,这样 OpenSSL 就安装完成了。

  2. 在我们的项目下面新建一个目录:certificate 用来存放私钥和公钥证书。

  3. 生成私钥文件(ca.key):

    使用不同的类型的密钥可以生成不同类型的私钥文件,常见的密钥种类有 RSA、ECC(椭圆曲线加密)和 DSA(数字签名算法)。以 RSA 为例:

    text
    # 使用RSA密钥生成ca.key私钥openssl genpkey -algorithm RSA -out ca.key

    如果需要一个密码去保护密钥,可以使用 -aes-256-cbc(或其他加密算法)参数。

    text
    openssl genpkey -algorithm RSA -out ca.key -aes-256-cbc# 输入完成后会出现下面这样,要求你输入密码来保护你的密钥Enter PEM pass phrase:Verifying - Enter PEM pass phrase:
  4. 生成证书签名请求 (CSR)

    使用私钥生成一个证书签名请求 (CSR)。在生成 CSR 时,系统会提示你输入一些信息,例如国家代码、组织名称等。这些信息用于证书的 “主题” 字段。

    bash
    D:\GolandProjects\RPC\gRPC\certificate>openssl req -new -key ca.key -out ca.csrEnter pass phrase for ca.key: //这里输入前面那个保护密钥的密码You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter '.', the field will be left blank.-----Country Name (2 letter code) [AU]:CN //(表示中国)State or Province Name (full name) [Some-State]:JiangXi // (表示江西省)Locality Name (eg, city) []:NanChang //(表示南昌市)Organization Name (eg, company) [Internet Widgits Pty Ltd]:school //(填一个组织名)Organizational Unit Name (eg, section) []:software //(填一个部门名称)Common Name (e.g. server FQDN or YOUR name) []:abc.com //(域名)Email Address []:abc@qq.com //(邮箱)Please enter the following 'extra' attributesto be sent with your certificate requestA challenge password []: //(可选)An optional company name []: //(可选)

    这样就生成了一个 ca.csr 证书签名请求。

  5. 生成自签名证书

    可以使用 CSR 生成一个自签名的证书,有效期为例如 365 天:

    text
    openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt

    使用私钥文件 ca.key 和证书签名请求 ca.csr 就可以生成公钥文件 ca.crt。以上就完成了证书的创建。

SAN 使用

SAN 是 “主题备用名称”(Subject Alternative Name)的缩写,它是 SSL/TLS 证书的一个扩展,允许在单个证书上指定多个域名和 IP 地址。

SAN 不仅限于绑定多个域名,它还可以包括以下类型的实体标识:

  • DNS 名称:可以是完全限定的域名(FQDN)或子域名。
  • IP 地址:允许证书直接绑定到 IP 地址。
  • 电子邮件地址:可以用于证书为电子邮件客户端或服务器提供身份验证。
  • URI:统一资源标识符(URI)也可以包含在 SAN 中。
  • 目录名称:包含特定的目录名称条目。
  • 其他名称:可以包括其他类型的标识符,如 UPN(用户主体名称)。

在创建具有 SAN 的证书时,需要在创建证书签名请求(CSR)时指定这些值。在 OpenSSL 中,通常通过在证书的配置文件中添加 SAN 条目来完成;也可以在命令行上使用 -addext 选项直接添加。下面是操作步骤:

  1. 生成密钥:

    text
    openssl genpkey -algorithm RSA -out server.key
  2. 编辑 openssl.cnf 文件:在 OpenSSL 的安装目录下有一个 bin 目录,在 bin\cnf 目录下面有一个 openssl.cnf 文件,将这个文件拷贝到项目的 certificate 目录下面,并对 openssl.cnf 文件做如下修改:

    • 去掉 copy_extensions = copy 这个配置前面的注释符号

    • 去掉 req_extensions = v3_req 这个配置前面的注释符号

    • 找到[ v3_req ]模块,在该模块下添加如下内容:subjectAltName = @alt_names

    • 添加[ alt_names ]模块,并在模块中添加如下内容:

      text
      DNS.1 = abc.comDNS.2 = xyz.com
  3. 生成证书签名请求。使用命令:

    text
    openssl req -new -nodes -key server.key -out server.csr -config ./openssl.cnf -extensions v3_req

    这个命令的解释如下:

    • req: 这是 OpenSSL 的子命令,用于处理 CSR 相关的任务。
    • -new: 表示要创建一个新的 CSR。
    • -nodes: 这个选项是“no DES”的缩写,它指示 OpenSSL 不要对生成的私钥进行密码保护。如果省略此选项,则在生成私钥时会要求输入一个密码。
    • -key server.key: 指定私钥文件的路径和文件名。这个私钥会被用来生成 CSR。如果指定的私钥文件不存在,OpenSSL 会提示你创建一个新的私钥。
    • -out server.csr: 指定生成的 CSR 的文件名和路径。这个文件包含了证书的签名请求,可以提交给 CA 进行签名。
    • -config ./openssl.cnf: 指定 OpenSSL 配置文件的路径。这个文件包含了 CSR 生成所需的配置信息。
    • -extensions v3_req: 指定在生成的 CSR 中应用哪个部分的扩展。在这里,v3_req 是配置文件中定义的一个部分,它通常包含了 SAN 信息以及其他证书 v3 扩展。
  4. 生成自签名证书

    由于我们前面已经生成过 ca.key 和 ca.crt,那么可以利用这两个证书来生成 SAN 自签名证书。命令:

    text
    openssl x509 -req -days 365 -in server.csr -out server.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req

    这个命令的解释如下:

    • x509: 这是 OpenSSL 的一个工具,用于显示和处理 X.509 证书。
    • -req: 表示命令将读取一个 CSR 文件。
    • -days 365: 指定证书将有效的天数。在这里,设置为 365 天,即一年。
    • -in server.csr: 指定输入文件,即您的证书签名请求文件。
    • -out server.pem: 指定证书输出文件。
    • -CA ca.crt: 指定 CA 的证书。这是签发新证书的信任根。
    • -CAkey ca.key: 这里应该是一个错误,因为它应该是 CA 的私钥,而不是服务器的私钥。你应该有一个专门的 CA 私钥文件。
    • -CAcreateserial: 如果这是 CA 第一次签名证书,这个选项将创建一个 CA 序列号文件。如果序列号文件已存在,它会被用来为新证书生成序列号。
    • -extfile ./openssl.cnf: 指定包含证书扩展的配置文件。
    • -extensions v3_req: 指出要使用配置文件中哪个部分的扩展。

经过以上步骤,我们就成功创建了关于服务端的验证证书。

服务端与客户端应用证书

原来使用的是客户端和服务器之间的通信是无认证的模式[ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock() ],现在让客户端去验证服务端。服务端的修改如下:

text
##### 原来服务端的代码 #####s := grpc.NewServer()##### 修改后的服务端代码 #####// 从文件加载服务端证书和密钥cred, err := credentials.NewServerTLSFromFile("../certificate/server.pem", "../certificate/server.key")if err != nil {	log.Fatalf("Failed to setup TLS: %v", err)}// 创建一个新的gRPC服务器实例s := grpc.NewServer(grpc.Creds(cred))

客户端修改如下:

go
#### 原来的客户端代码 #####conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithBlock())if err != nil {	log.Fatalf("did not connect: %v", err)}##### 修改后的客户端代码,由于前面配置了多个DNS解析,所以也可以将下面的 xyz.com 换成 abc.com #####cred, err := credentials.NewClientTLSFromFile("../certificate/server.pem", "xyz.com")if err != nil {	log.Fatalf("Failed to create TLS credentials %v", err)}// 与本地主机上的gRPC服务建立连接conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(cred))if err != nil {	log.Fatalf("did not connect: %v", err)}

运行结果:

text
Name: 李四 Age: 21

单向认证

image-20231113170637819

  1. 客户端向服务端发送 SSL 协议版本号、加密算法种类、随机数等信息。

  2. 服务端给客户端返回 SSL 协议版本号、加密算法种类、随机数等信息,同时也返回服务器端的证书,即公钥证书

  3. 客户端使用服务端返回的信息验证服务器的合法性,包括:

    • 证书是否过期

    • 发型服务器证书的 CA 是否可靠

    • 返回的公钥是否能正确解开返回证书中的数字签名

    • 服务器证书上的域名是否和服务器的实际域名相匹配

    验证通过后,将继续进行通信,否则,终止通信

  4. 客户端向服务端发送自己所能支持的对称加密方案,供服务器端进行选择

  5. 服务器端在客户端提供的加密方案中选择加密程度最高的加密方式。

  6. 服务器将选择好的加密方案通过明文方式返回给客户端

  7. 客户端接收到服务端返回的加密方式后,使用该加密方式生成产生随机码,用作通信过程中对称加密的密钥,使用服务端返回的公钥进行加密,将加密后的随机码发送至服务器

  8. 服务器收到客户端返回的加密信息后,使用自己的私钥进行解密,获取对称加密密钥。 在接下来的会话中,服务器和客户端将会使用该密码进行对称加密,保证通信过程中信息的安全。

在上面的 gRPC 通信过程中,就是使用了单向认证。在客户端拿到服务端的公钥后,对公钥进行验证,验证通过后就直接使用服务端的公钥对数据进行加密,然后将加密后的消息发送给服务端。

双向认证

双向认证和单向认证原理基本差不多,只是除了客户端需要认证服务端以外,增加了服务端对客户端的认证,具体过程如下:

image-20231113172900053

  1. 客户端向服务端发送 SSL 协议版本号、加密算法种类、随机数等信息。

  2. 服务端给客户端返回 SSL 协议版本号、加密算法种类、随机数等信息,同时也返回服务器端的证书,即公钥证书

  3. 客户端使用服务端返回的信息验证服务器的合法性,包括:

    • 证书是否过期

    • 发型服务器证书的 CA 是否可靠

    • 返回的公钥是否能正确解开返回证书中的数字签名

    • 服务器证书上的域名是否和服务器的实际域名相匹配

    验证通过后,将继续进行通信,否则,终止通信

  4. 服务端要求客户端发送客户端的证书,客户端会将自己的证书发送至服务端

  5. 验证客户端的证书,通过验证后,会获得客户端的公钥

  6. 客户端向服务端发送自己所能支持的对称加密方案,供服务器端进行选择

  7. 服务器端在客户端提供的加密方案中选择加密程度最高的加密方式

  8. 将加密方案通过使用之前获取到的公钥进行加密,返回给客户端

  9. 客户端收到服务端返回的加密方案密文后,使用自己的私钥进行解密,获取具体加密方式,而后,产生该加密方式的随机码,用作加密过程中的密钥,使用之前从服务端证书中获取到的公钥进行加密后,发送给服务端

  10. 服务端收到客户端发送的消息后,使用自己的私钥进行解密,获取对称加密的密钥,在接下来的会话中,服务器和客户端将会使用该密码进行对称加密,保证通信过程中信息的安全。

下面使用 gRPC 的双向认证实现客户端和服务端之间的通信:

  1. 首先就是生成客户端的私钥和公钥,生成方式和上面服务端生成私钥和公钥的方式几乎一样,这里不在赘述。

  2. 修改客户端和服务端的代码,主要就是为了增加客户端认证这么一个过程,所以首先客户端需要将自己的公钥发送给服务端,服务端则需要验证客户端(通过 ca.crt 证书)

    客户端要修改的代码如下:

    go
    // 加载CA证书caCert, err := os.ReadFile("../certificate/ca.crt")if err != nil {  log.Fatalf("could not read ca certificate: %s", err)}// 创建一个证书池,并添加CA证书caCertPool := x509.NewCertPool()caCertPool.AppendCertsFromPEM(caCert)// 加载客户端证书和密钥cert, err := tls.LoadX509KeyPair("../certificate/client.pem", "../certificate/client.key")if err != nil {  log.Fatalf("failed to load client key pair: %s", err)}// 创建一个TLS配置tlsConfig := &tls.Config{	ServerName:   "abc.com",               // 重要:这里必须匹配服务器证书中的名称	Certificates: []tls.Certificate{cert}, // 客户端证书	RootCAs:      caCertPool,              // CA证书池}

    服务端要修改的代码如下:

    go
    // 从文件加载服务端证书和密钥cert, err := tls.LoadX509KeyPair("../certificate/server.pem", "../certificate/server.key")if err != nil {  log.Fatalf("Failed to setup TLS: %v", err)}// 加载CA证书caCert, err := os.ReadFile("../certificate/ca.crt")if err != nil {  log.Fatalf("failed to read CA certificate: %s", err)}caCertPool := x509.NewCertPool()caCertPool.AppendCertsFromPEM(caCert)// 创建TLS配置,用于gRPC服务器tlsConfig := &tls.Config{  ClientAuth:   tls.RequireAndVerifyClientCert, // 启用双向认证  Certificates: []tls.Certificate{cert},        // 服务器证书  ClientCAs:    caCertPool,                     // CA证书池}// 创建gRPC服务器,并添加TLS配置s := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))

    运行结果:

    text
    Name: 李四 Age: 21

最后在整理一下双向认证的思路:

  1. 服务端要有服务端的私钥(server.key)和服务端的公钥(server.pem);然后客户端要有客户端的私钥(client.key)和客户端的公钥(client.pem),并且它们公钥必需依靠第三方机构(ca)生成。
  2. 再说具体一点,以服务端为例,首先生成服务端的私钥(不需要任何其他证书),然后根据私钥生成证书签名请求(server.csr),最后使用 ca 的证书(ca.crt)、ca 的私钥(ca.key)、服务端的私钥(server.key)、服务端的证书签名请求(server.csr)这四个证书生成服务端的公钥。这样生成的公钥由于依托于第三方机构(ca)的认证,因此服务端的公钥无法伪造。客户端的公钥也是同理。
  3. 所以在双向认证中,服务端需要做的事情就是:加载本地的私钥(用来将客户端发来的关键信息解密)、加载本地的公钥(一个目的是为了进行身份验证,另一个目的是把即将发送给客户端的关键信息加密)、加载 ca 证书(这个 ca.crt 证书可以用来验证客户端传递过来的公钥是否是真的客户端公钥,而不是黑客伪造的客户端公钥)。这段话中有一个“关键信息”这个词语,这个关键信息指的是客户端和服务端进行数据通信使用的对称加密算法的密钥
  4. 所以,公钥/私钥的作用就是在 TLS 的握手过程中用于身份认证安全地交换对称密钥。而一旦会话密钥通过这种安全的方式交换成功后,服务端和客户端则会使用该密钥进行会话的加密数据,因为对称加密比非对称加密在处理速度上更加高效。

Token 认证

这个 token 认证就是做一个权限认证。在服务端和客户端成功建立连接后,服务端的某些服务可能需要特定的权限才能调用,那么就需要一个拦截器去对请求进行拦截,检查请求是否举要相应的权限。验证权限的方式有多种,可以是用户名密码、JWT、OAuth 等。这里写一个简单的使用用户名密码进行认证的 demo。

客户端需要实现 PerRPCCredentials 接口,这个接口的作用就是将认证信息写入到 gRPC 的上下文中,并设置是否使用安全传输。

go
type PerRPCCredentials interface {    GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)    RequireTransportSecurity() bool}

所以,具体实现如下:

go
type Authentication struct {    User     string    Password string}func (a *Authentication) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {	return map[string]string{"user": a.User, "password": a.Password}, nil}func (a *Authentication) RequireTransportSecurity() bool {	return true}

然后赋值即可:

text
user := &Authentication{    User:     "admin",    Password: "admin",}// 与本地主机上的gRPC服务建立连接conn, _ := grpc.Dial("localhost:1234",grpc.WithPerRPCCredentials(user))

服务端代码如下:

Auth 函数是一个身份验证函数,它的目的是从 gRPC 的上下文(ctx)中提取元数据(metadata),这些元数据包含了客户端传递的用户名和密码信息。该函数检查元数据是否存在,然后验证用户名和密码是否匹配。如果凭据不匹配,函数将返回一个带有错误信息的 status.Error,表明用户未通过认证。

text
func Auth(ctx context.Context) error {	md, ok := metadata.FromIncomingContext(ctx)	if !ok {		return fmt.Errorf("missing credentials")	}	var user string	var password string	if val, ok := md["user"]; ok {		user = val[0]	}	if val, ok := md["password"]; ok {		password = val[0]	}	if user != "admin" || password != "admin" {		return status.Errorf(codes.Unauthenticated, "用户名或密码错误")	}	return nil}

authInterceptor 是一个一元服务器拦截器(grpc.UnaryServerInterceptor),它在 gRPC 服务器处理请求之前拦截这些请求。它的工作流程如下:

  1. 在处理实际的 RPC 之前,拦截器调用 Auth 函数来验证客户端提供的认证信息。
  2. 如果 Auth 函数返回错误,则拦截器会中断处理过程,并将错误返回给客户端,请求不会到达实际的 RPC 方法。
  3. 如果凭据验证成功(Auth 函数返回 nil),则 handler 将被调用以继续处理 RPC 请求。

在下面的示例中,拦截器是通过grpc.NewServergrpc.UnaryInterceptor选项注册到 gRPC 服务器的。这意味着对于所有一元 RPC 调用,gRPC 服务器都会执行这个拦截器逻辑,增加了一个身份验证层。

text
var authInterceptor grpc.UnaryServerInterceptor//匿名方法authInterceptor = func(	ctx context.Context,	req interface{},	info *grpc.UnaryServerInfo,	handler grpc.UnaryHandler,) (resp interface{}, err error) {	//拦截普通方法请求,验证 Token	err = Auth(ctx)	if err != nil {		return	}// 继续处理请求return handler(ctx, req)}// 创建gRPC服务器,并添加TLS配置s := grpc.NewServer(grpc.UnaryInterceptor(authInterceptor))

stream

在 gRPC 中,流(stream)是一种允许客户端或服务器一边发送一边接受一系列消息的通信方式。在传统的 RPC 调用中,客户端发送一个请求到服务器,然后等待响应。而在流式 RPC 中,客户端和或服务器可以通过流发送多个消息,并且可以独立地响应。

stream 为什么要存在呢?传统的 RPC 有什么问题吗?通过模拟业务场景,可得知在使用传统的 RPC 时,有如下问题:

  • 数据包过大造成的瞬时压力
  • 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法做到客户端边发送,服务端边处理)

所以对于大规模数据包对于实时性要求高的业务场景就非常适合用流式 RPC。

使用关键字 stream,就可以声明其为一个流方法。共有三种流模式:客户端流、服务端流和双向流。

客户端流

  1. student.proto 文件核心内容:

    protobuf
    message StudentRequest {  string id = 1;}message StudentResponse {  string msg = 1;}service StudentService {  rpc GetStudent(stream StudentRequest) returns (StudentResponse) {}}

    使用protoc --go_out=. --go-grpc_out=. student.proto命令生成 go 文件。

  2. 服务端核心代码:

    go
    type server struct {	pb.UnimplementedStudentServiceServer}func (s *server) GetStudent(stream pb.StudentService_GetStudentServer) error {	for {		req, err := stream.Recv()		if err == io.EOF {			// 当接收到客户端的结束信号时,发送一个响应			return stream.SendAndClose(&pb.StudentResponse{				Msg: "End of Stream",			})		}		if err != nil {			log.Fatalf("Failed to receive a note : %v", err)		}		log.Printf("接收到的学生ID为: %s", req.Id)	}	return nil}func main() {	lis, err := net.Listen("tcp", ":9999")	if err != nil {		log.Fatalf("failed to listen: %v", err)	}	s := grpc.NewServer()	pb.RegisterStudentServiceServer(s, &server{})	// 监听gRPC请求	if err := s.Serve(lis); err != nil {		log.Fatalf("failed to serve: %v", err)	}}
  3. 客户端核心代码:

    go
    func main() {	conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())	if err != nil {		log.Fatalf("did not connect: %v", err)	}	defer func(conn *grpc.ClientConn) {		_ = conn.Close()	}(conn)	c := pb.NewStudentServiceClient(conn)	stream, err := c.GetStudent(context.Background())	if err != nil {		log.Fatalf("Error on get students: %v", err)	}	// 模拟发送多个学生 ID 的客户端流	for i := 1; i <= 5; i++ {		id := fmt.Sprintf("%d", i)		log.Printf("发送的学生ID: %s", id)		if err := stream.Send(&pb.StudentRequest{Id: id}); err != nil {			log.Fatalf("Failed to send a note: %v", err)		}        time.Sleep(time.Second)	}	// 接收响应	res, err := stream.CloseAndRecv()	if err != nil {		log.Fatalf("Failed to receive a response : %v", err)	}	log.Printf("接收到的响应: %s", res.Msg)}

    在成功连接服务端后,开始使用 for 循环连续的发送 5 个请求(发送 5 个 id 给服务端),服务端处理完 5 个请求后发送响应给客户端,客户端打印响应的内容。如果打开多个终端,可以很清楚的看到客户端一遍发送消息,服务端一遍处理消息。

  4. 运行结果:

    image-20231114175052415

双向流

  1. student.proto 文件核心内容:

    protobuf
    message StudentRequest {  string id = 1;  string name = 2;}message StudentResponse {  string id = 1;  string name = 2;  string message = 3;}service StudentService {  rpc Chat(stream StudentRequest) returns (stream StudentResponse);}
  2. 服务端核心代码:

    go
    type server struct {	pb.UnimplementedStudentServiceServer}func (s *server) Chat(stream pb.StudentService_ChatServer) error {	for {		in, err := stream.Recv()		if err == io.EOF {			// 客户端流已关闭			return nil		}		if err != nil {			log.Fatalf("Failed to receive a note : %v", err)			return err		}		log.Printf("接收到的学生ID为: %s", in.GetId())		// 发送响应回客户端		if err := stream.Send(&pb.StudentResponse{Id: in.GetId(), Name: in.GetName(), Message: "已经收到了你的消息。"}); err != nil {			log.Fatalf("Failed to send a response: %v", err)			return err		}	}}func main() {	lis, err := net.Listen("tcp", ":9999")	if err != nil {		log.Fatalf("failed to listen: %v", err)	}	s := grpc.NewServer()	pb.RegisterStudentServiceServer(s, &server{})	// 监听gRPC请求	if err := s.Serve(lis); err != nil {		log.Fatalf("failed to serve: %v", err)	}}
  3. 客户端核心代码:

    go
    func receiveMessages(stream pb.StudentService_ChatClient, wg *sync.WaitGroup) {	defer wg.Done()	for {		in, err := stream.Recv()		if err == io.EOF {			// 服务端关闭了流			return		}		if err != nil {			log.Fatalf("Failed to receive a message : %v", err)		}		log.Printf("来自服务端的消息: %s", in.GetMessage())	}}func main() {	conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())	if err != nil {		log.Fatalf("did not connect: %v", err)	}	defer func(conn *grpc.ClientConn) {		_ = conn.Close()	}(conn)	c := pb.NewStudentServiceClient(conn)	// 开始双向流	stream, err := c.Chat(context.Background())	if err != nil {		log.Fatalf("Error creating stream: %v", err)	}	// 模拟发送多个学生 ID 的客户端流	for i := 1; i <= 5; i++ {		id := fmt.Sprintf("%d", i)		log.Printf("发送的学生ID: %s", id)		if err := stream.Send(&pb.StudentRequest{Id: id}); err != nil {			log.Fatalf("Failed to send a note: %v", err)		}		time.Sleep(time.Second)	}	// 关闭发送方向的流	if err := stream.CloseSend(); err != nil {		log.Fatal(err)	}	// 接收来自服务器的消息	var wg sync.WaitGroup	wg.Add(1)	go receiveMessages(stream, &wg)	// 等待接收消息的 goroutine 完成	wg.Wait()}

    客户端开启一个 goroutine 去接收服务端传递过来的消息。

  4. 运行结果:

image-20231114184621837

  • 在 gRPC 的双向流中,客户端和服务端都可以独立地接收和发送消息。客户端发送消息给服务端的流和服务端发送消息给客户端的流是并发的,可以同时进行。
  • 在客户端输出中,可以看到客户端按照预期的一秒间隔发送消息。然后,在客户端发送完所有消息之后,几乎在同一时间接收到了来自服务端的所有响应。这意味着服务端可能在接收到每个消息后几乎立即发送了响应,但由于客户端在发送过程中没有等待或处理这些响应,所有的响应都在发送完成后才被客户端处理和打印出来。
  • 服务器端可以在读取每个客户端发送的消息后立即写入响应,而客户端可以独立地读取这些响应,而不影响它发送消息的能力。