辉夜的博客

繁花似锦,辉夜如昼

0%

grpc-go源码分析(1)

这篇文章主要分析了grpc-go服务端的启动过程,重点考察了服务端的建立、注册、监听
等关键的生命周期对应的代码实现。
目前阶段主要考察普通的rpc调用,暂时没有研究流式传输。

首先来看服务端examples/helloworld/greeter_server/main.go的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
// 解析命令行参数,主要是port
flag.Parse()

// 新建一个对本地端口的监听
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

// 新建grpc服务器
s := grpc.NewServer()

// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())

// 启动监听
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

可以看到, main函数中和server相关的操作主要有三步:
(1)创建 server
(2)server 的注册
(3)调用 Serve 监听端口并处理请求

Server的创建

这里调用的函数就是\server.go#NewServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func NewServer(opt ...ServerOption) *Server {
//用option模式指定各个服务器选项
opts := defaultServerOptions
for _, o := range extraServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}

//构建服务器
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}

// 配置拦截器
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)

// 同步相关(优雅退出时发送信号)
s.cv = sync.NewCond(&s.mu)

// 调试相关 打印调用信息
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}

// 并发相关
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}

// channelz(一个调试工具)相关
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
channelz.Info(logger, s.channelzID, "Server created")

return s
}

一个Server结构体包括了一系列网络通讯和同步相关的内容,通常是使用了sync包中的功能或利用通道完成各种同步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions

mu sync.Mutex // guards following

lis map[net.Listener]bool

// 监听地址到 transports的映射
conns map[string]map[transport.ServerTransport]bool
// 是否在服务
serve bool
// ...
drain bool
// 优雅退出时进行广播
cv *sync.Cond
// 核心:服务名到服务信息的映射
services map[string]*serviceInfo
// 日志
events trace.EventLog

// 同步相关
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop

channelzID *channelz.Identifier
czData *channelzData

serverWorkerChannels []chan *serverWorkerData
}

用于同步的grpc.Event

对于Event类型的quit和done,这里简单的分析一下

1
2
3
4
5
6
// Event represents a one-time event that may occur in the future.
type Event struct {
fired int32
c chan struct{}
o sync.Once
}

Event可以被并发地多次触发。一旦被触发,e.c将被关闭,从而所有试图从e.c中接受一个值的协程将从阻塞中恢复,这起到了一对多通知的效果。o是为了防止e.c被多次关闭而引发panic。

核心部分:map[string]

而这一部分代码中的核心还是

1
services map[string]*serviceInfo

通过服务名,我们可以直接获取服务相关的信息,主要也是两个map,通过名称可以分别获取stream和method的描述(Desc)。每个描述都包含了name和handler

1
2
3
4
5
6
7
8
9
// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl interface{}
methods map[string]*MethodDesc
streams map[string]*StreamDesc
mdata interface{}
}

总体来说,在调用过程中Server的关键结构是这样的:

grpc-go源码分析

Server注册

main函数中的代码如下:

1
2
// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})

需要先考察一下server是一个什么样的结构

自定义的服务器实现:Server类型

examples/helloworld/greeter_server/main.go中:

1
2
3
4
5
6
7
8
9
10
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

它实际上就是对proto中定义的服务端的一个实现, 注意接口中要求的方法mustEmbedUnimplementedGreeterServer()是在protobuf的生成文件pb中的pb.UnimplementedGreeterServer中实现的,这是为了从语法上要求server的实现中必须包含pb.UnimplementedGreeterServer

1
2
3
4
5
6
7
8
// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
// Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
mustEmbedUnimplementedGreeterServer()
}

至于为什么必须包含这个奇怪的结构体,可能是因为这个结构体中包含了一个默认的SayHello方法,这样即使我们忘了实现SayHello方法,也能让server实现GreeterServer接口。

1
2
3
4
5
func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}

总而言之,Server就是一个我们自己实现的服务器。包含了我们在proto中声明的方法。

Register调用分析

main函数中直接调用的方法是:

1
2
// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})

server 的注册调用了 RegisterGreeterServer 方法,这个方法是examples/helloworld/helloworld/helloworld_grpc.pb.go中的:

1
2
3
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&Greeter_serviceDesc, srv)
}

这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :

1
2
3
4
5
6
7
8
9
10
11
12
var Greeter_ServiceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "examples/helloworld/helloworld/helloworld.proto",
}

可以看到,这个结构体的结构serviceInfo的结构是吻合的:
grpc

下面来看RegisterService函数的实现,核心的内容在就是检查完类型之后调用register将sd中的信息注入到seviceinfo结构体中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func (s *Server) RegisterService(sd*ServiceDesc, ss interface{}) {
// 检查类型
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
// 加锁
s.mu.Lock()
defer s.mu.Unlock()
// 打印日志
s.printf("RegisterService(%q)", sd.ServiceName)
// 检查异常
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}

// 将sd中的内容注入到serviceinfo中,并将ss类型保存为serviceImpl
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}

server 对不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理,这样相当于完成了grpc的代理操作,把字符串传递给代理,代理就能调用对应的实际方法去处理。

Sever服务过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func (s *Server) Serve(lis net.Listener) error {
//加锁
s.mu.Lock()
//打印日志
s.printf("serving")
//更新状态
s.serve = true

//检查是否已经关闭
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}

// 并发相关
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()

/* 注册端口监听
type listenSocket struct {
net.Listener
channelzID *channelz.Identifier
}
*/
ls := &listenSocket{Listener: lis}
s.lis[ls] = true

//(在退出时)注销端口监听
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()

//channelz相关
var err error
ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
if err != nil {
s.mu.Unlock()
return err
}

//解锁,并发操作完成
s.mu.Unlock()
channelz.Info(logger, ls.channelzID, "ListenSocket created")

var tempDelay time.Duration // how long to sleep on accept failure

// 死循环,用accept监听
for {
rawConn, err := lis.Accept()
//错误检查
//https://openskill.cn/article/1792
if err != nil {
//判断是否是临时错误
if ne, ok := err.(interface {
Temporary() bool
});
/*类型断言,利用短逻辑避免调用不存在的方法*/
ok && ne.Temporary() {
// 试图恢复,等待一段时间
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
// 打印日志
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()

// 等待,如果此时服务退出就不再等待直接返回
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.quit.Done():
timer.Stop()
return nil
}
continue
}

//等待多次之后仍有错误,打印日志
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()

// 检查是否退出
if s.quit.HasFired() {
return nil
}

// 返回错误信息
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}

最终这个函数会把端口地址和通过Accept得到的连接传递给handle函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 检查是否退出
if s.quit.HasFired() {
rawConn.Close()
return
}
// 设置一次IO操作的最大时间,如果超过直接失败
// 这里是用来限制连接时间的
rawConn.SetDeadline(time.Now().Add(s.opts. connectionTimeout))

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)

// 完成连接之后取消时长限制
rawConn.SetDeadline(time.Time{})
if st == nil {
return
}

if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
}

Http2握手

handle函数处理连接的第一步就是完成HTTP2的握手

1
2
3
4
5
6
7
8
9
10
11
12
13
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
...
}
st, err := transport.NewServerTransport(c, config)
if err != nil {
...
}

return st
}

核心代码如下:internal/transport/http2_server.go