funcNewServer(opt ...ServerOption) *Server { //用option模式指定各个服务器选项 opts := defaultServerOptions for _, o := range extraServerOptions { o.apply(&opts) } for _, o := range opt { o.apply(&opts) }
// serviceInfo wraps information about a service. It is very similar to // ServiceDesc and is constructed from it for internal purposes. type serviceInfo struct { // Contains the implementation for the methods in this service. serviceImpl interface{} methods map[string]*MethodDesc streams map[string]*StreamDesc mdata interface{} }
// GreeterServer is the server API for Greeter service. // All implementations must embed UnimplementedGreeterServer // for forward compatibility type GreeterServer interface { // Sends a greeting SayHello(context.Context, *HelloRequest) (*HelloReply, error) mustEmbedUnimplementedGreeterServer() }
// RegisterService registers a service and its implementation to the gRPC // server. It is called from the IDL generated code. This must be called before // invoking Serve. If ss is non-nil (for legacy code), its type is checked to // ensure it implements sd.HandlerType. func(s *Server)RegisterService(sd*ServiceDesc, ss interface{}) { // 检查类型 if ss != nil { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } } s.register(sd, ss) }
func(s *Server)register(sd *ServiceDesc, ss interface{}) { // 加锁 s.mu.Lock() defer s.mu.Unlock() // 打印日志 s.printf("RegisterService(%q)", sd.ServiceName) // 检查异常 if s.serve { logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.services[sd.ServiceName]; ok { logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } // 将sd中的内容注入到serviceinfo中,并将ss类型保存为serviceImpl info := &serviceInfo{ serviceImpl: ss, methods: make(map[string]*MethodDesc), streams: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] info.methods[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] info.streams[d.StreamName] = d } s.services[sd.ServiceName] = info }
server 对不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理,这样相当于完成了grpc的代理操作,把字符串传递给代理,代理就能调用对应的实际方法去处理。
// 返回错误信息 return err } tempDelay = 0 // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) gofunc() { s.handleRawConn(lis.Addr().String(), rawConn) s.serveWG.Done() }() } }
// handleRawConn forks a goroutine to handle a just-accepted connection that // has not had any I/O performed on it yet. func(s *Server)handleRawConn(lisAddr string, rawConn net.Conn) { // 检查是否退出 if s.quit.HasFired() { rawConn.Close() return } // 设置一次IO操作的最大时间,如果超过直接失败 // 这里是用来限制连接时间的 rawConn.SetDeadline(time.Now().Add(s.opts. connectionTimeout))
// Finish handshaking (HTTP2) st := s.newHTTP2Transport(rawConn)
// 完成连接之后取消时长限制 rawConn.SetDeadline(time.Time{}) if st == nil { return }
// newHTTP2Transport sets up a http/2 transport (using the // gRPC http2 server transport in transport/http2_server.go). func(s *Server)newHTTP2Transport(c net.Conn)transport.ServerTransport { config := &transport.ServerConfig{ ... } st, err := transport.NewServerTransport(c, config) if err != nil { ... }