/* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpc import ( "bytes" "errors" "fmt" "io" "math" "net" "net/http" "reflect" "runtime" "strings" "sync" "time" "golang.org/x/net/context" "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/tap" "google.golang.org/grpc/transport" ) const ( defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 defaultServerMaxSendMessageSize = math.MaxInt32 ) type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { MethodName string Handler methodHandler } // ServiceDesc represents an RPC service's specification. type ServiceDesc struct { ServiceName string // The pointer to the service interface. Used to check whether the user // provided implementation satisfies the interface requirements. HandlerType interface{} Methods []MethodDesc Streams []StreamDesc Metadata interface{} } // service consists of the information of the server serving this service and // the methods in this service. type service struct { server interface{} // the server for service methods md map[string]*MethodDesc sd map[string]*StreamDesc mdata interface{} } // Server is a gRPC server to serve RPC requests. type Server struct { opts options mu sync.Mutex // guards following lis map[net.Listener]bool conns map[io.Closer]bool serve bool drain bool ctx context.Context cancel context.CancelFunc // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished // and all the transport goes away. cv *sync.Cond m map[string]*service // service name -> service info events trace.EventLog } type options struct { creds credentials.TransportCredentials codec Codec cp Compressor dc Decompressor unaryInt UnaryServerInterceptor streamInt StreamServerInterceptor inTapHandle tap.ServerInHandle statsHandler stats.Handler maxConcurrentStreams uint32 maxReceiveMessageSize int maxSendMessageSize int useHandlerImpl bool // use http.Handler-based server unknownStreamDesc *StreamDesc keepaliveParams keepalive.ServerParameters keepalivePolicy keepalive.EnforcementPolicy initialWindowSize int32 initialConnWindowSize int32 } var defaultServerOptions = options{ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, } // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption func(*options) // InitialWindowSize returns a ServerOption that sets window size for stream. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialWindowSize(s int32) ServerOption { return func(o *options) { o.initialWindowSize = s } } // InitialConnWindowSize returns a ServerOption that sets window size for a connection. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialConnWindowSize(s int32) ServerOption { return func(o *options) { o.initialConnWindowSize = s } } // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { return func(o *options) { o.keepaliveParams = kp } } // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { return func(o *options) { o.keepalivePolicy = kep } } // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. func CustomCodec(codec Codec) ServerOption { return func(o *options) { o.codec = codec } } // RPCCompressor returns a ServerOption that sets a compressor for outbound messages. func RPCCompressor(cp Compressor) ServerOption { return func(o *options) { o.cp = cp } } // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. func RPCDecompressor(dc Decompressor) ServerOption { return func(o *options) { o.dc = dc } } // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead. func MaxMsgSize(m int) ServerOption { return MaxRecvMsgSize(m) } // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. // If this is not set, gRPC uses the default 4MB. func MaxRecvMsgSize(m int) ServerOption { return func(o *options) { o.maxReceiveMessageSize = m } } // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. // If this is not set, gRPC uses the default 4MB. func MaxSendMsgSize(m int) ServerOption { return func(o *options) { o.maxSendMessageSize = m } } // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { return func(o *options) { o.maxConcurrentStreams = n } } // Creds returns a ServerOption that sets credentials for server connections. func Creds(c credentials.TransportCredentials) ServerOption { return func(o *options) { o.creds = c } } // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the // server. Only one unary interceptor can be installed. The construction of multiple // interceptors (e.g., chaining) can be implemented at the caller. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { return func(o *options) { if o.unaryInt != nil { panic("The unary server interceptor was already set and may not be reset.") } o.unaryInt = i } } // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the // server. Only one stream interceptor can be installed. func StreamInterceptor(i StreamServerInterceptor) ServerOption { return func(o *options) { if o.streamInt != nil { panic("The stream server interceptor was already set and may not be reset.") } o.streamInt = i } } // InTapHandle returns a ServerOption that sets the tap handle for all the server // transport to be created. Only one can be installed. func InTapHandle(h tap.ServerInHandle) ServerOption { return func(o *options) { if o.inTapHandle != nil { panic("The tap handle was already set and may not be reset.") } o.inTapHandle = h } } // StatsHandler returns a ServerOption that sets the stats handler for the server. func StatsHandler(h stats.Handler) ServerOption { return func(o *options) { o.statsHandler = h } } // UnknownServiceHandler returns a ServerOption that allows for adding a custom // unknown service handler. The provided method is a bidi-streaming RPC service // handler that will be invoked instead of returning the "unimplemented" gRPC // error whenever a request is received for an unregistered service or method. // The handling function has full access to the Context of the request and the // stream, and the invocation passes through interceptors. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { return func(o *options) { o.unknownStreamDesc = &StreamDesc{ StreamName: "unknown_service_handler", Handler: streamHandler, // We need to assume that the users of the streamHandler will want to use both. ClientStreams: true, ServerStreams: true, } } } // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o(&opts) } if opts.codec == nil { // Set the default codec. opts.codec = protoCodec{} } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[io.Closer]bool), m: make(map[string]*service), } s.cv = sync.NewCond(&s.mu) s.ctx, s.cancel = context.WithCancel(context.Background()) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) } return s } // printf records an event in s's event log, unless s has been stopped. // REQUIRES s.mu is held. func (s *Server) printf(format string, a ...interface{}) { if s.events != nil { s.events.Printf(format, a...) } } // errorf records an error in s's event log, unless s has been stopped. // REQUIRES s.mu is held. func (s *Server) errorf(format string, a ...interface{}) { if s.events != nil { s.events.Errorf(format, a...) } } // RegisterService registers a service and its implementation to the gRPC // server. It is called from the IDL generated code. This must be called before // invoking Serve. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } s.register(sd, ss) } func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if s.serve { grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.m[sd.ServiceName]; ok { grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } s.m[sd.ServiceName] = srv } // MethodInfo contains the information of an RPC including its method name and type. type MethodInfo struct { // Name is the method name only, without the service name or package name. Name string // IsClientStream indicates whether the RPC is a client streaming RPC. IsClientStream bool // IsServerStream indicates whether the RPC is a server streaming RPC. IsServerStream bool } // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. type ServiceInfo struct { Methods []MethodInfo // Metadata is the metadata specified in ServiceDesc when registering service. Metadata interface{} } // GetServiceInfo returns a map from service names to ServiceInfo. // Service names include the package names, in the form of .. func (s *Server) GetServiceInfo() map[string]ServiceInfo { ret := make(map[string]ServiceInfo) for n, srv := range s.m { methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) for m := range srv.md { methods = append(methods, MethodInfo{ Name: m, IsClientStream: false, IsServerStream: false, }) } for m, d := range srv.sd { methods = append(methods, MethodInfo{ Name: m, IsClientStream: d.ClientStreams, IsServerStream: d.ServerStreams, }) } ret[n] = ServiceInfo{ Methods: methods, Metadata: srv.mdata, } } return ret } var ( // ErrServerStopped indicates that the operation is now illegal because of // the server being stopped. ErrServerStopped = errors.New("grpc: the server has been stopped") ) func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { if s.opts.creds == nil { return rawConn, nil, nil } return s.opts.creds.ServerHandshake(rawConn) } // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC requests and then call the registered handlers to reply to them. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. // Serve always returns non-nil error. func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.printf("serving") s.serve = true if s.lis == nil { s.mu.Unlock() lis.Close() return ErrServerStopped } s.lis[lis] = true s.mu.Unlock() defer func() { s.mu.Lock() if s.lis != nil && s.lis[lis] { lis.Close() delete(s.lis, lis) } s.mu.Unlock() }() var tempDelay time.Duration // how long to sleep on accept failure for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } s.mu.Lock() s.printf("Accept error: %v; retrying in %v", err, tempDelay) s.mu.Unlock() timer := time.NewTimer(tempDelay) select { case <-timer.C: case <-s.ctx.Done(): } timer.Stop() continue } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock() return err } tempDelay = 0 // Start a new goroutine to deal with rawConn // so we don't stall this Accept loop goroutine. go s.handleRawConn(rawConn) } } // handleRawConn is run in its own goroutine and handles a just-accepted // connection that has not had any I/O performed on it yet. func (s *Server) handleRawConn(rawConn net.Conn) { conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { s.mu.Lock() s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) // If serverHandShake returns ErrConnDispatched, keep rawConn open. if err != credentials.ErrConnDispatched { rawConn.Close() } return } s.mu.Lock() if s.conns == nil { s.mu.Unlock() conn.Close() return } s.mu.Unlock() if s.opts.useHandlerImpl { s.serveUsingHandler(conn) } else { s.serveHTTP2Transport(conn, authInfo) } } // serveHTTP2Transport sets up a http/2 transport (using the // gRPC http2 server transport in transport/http2_server.go) and // serves streams on it. // This is run in its own goroutine (it does network I/O in // transport.NewServerTransport). func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { config := &transport.ServerConfig{ MaxStreams: s.opts.maxConcurrentStreams, AuthInfo: authInfo, InTapHandle: s.opts.inTapHandle, StatsHandler: s.opts.statsHandler, KeepaliveParams: s.opts.keepaliveParams, KeepalivePolicy: s.opts.keepalivePolicy, InitialWindowSize: s.opts.initialWindowSize, InitialConnWindowSize: s.opts.initialConnWindowSize, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { s.mu.Lock() s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) s.mu.Unlock() c.Close() grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) return } if !s.addConn(st) { st.Close() return } s.serveStreams(st) } func (s *Server) serveStreams(st transport.ServerTransport) { defer s.removeConn(st) defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait() } var _ http.Handler = (*Server)(nil) // serveUsingHandler is called from handleRawConn when s is configured // to handle requests via the http.Handler interface. It sets up a // net/http.Server to handle the just-accepted conn. The http.Server // is configured to route all incoming requests (all HTTP/2 streams) // to ServeHTTP, which creates a new ServerTransport for each stream. // serveUsingHandler blocks until conn closes. // // This codepath is only used when Server.TestingUseHandlerImpl has // been configured. This lets the end2end tests exercise the ServeHTTP // method as one of the environment types. // // conn is the *tls.Conn that's already been authenticated. func (s *Server) serveUsingHandler(conn net.Conn) { if !s.addConn(conn) { conn.Close() return } defer s.removeConn(conn) h2s := &http2.Server{ MaxConcurrentStreams: s.opts.maxConcurrentStreams, } h2s.ServeConn(conn, &http2.ServeConnOpts{ Handler: s, }) } // ServeHTTP implements the Go standard library's http.Handler // interface by responding to the gRPC request r, by looking up // the requested gRPC method in the gRPC server s. // // The provided HTTP request must have arrived on an HTTP/2 // connection. When using the Go standard library's server, // practically this means that the Request must also have arrived // over TLS. // // To share one port (such as 443 for https) between gRPC and an // existing http.Handler, use a root http.Handler such as: // // if r.ProtoMajor == 2 && strings.HasPrefix( // r.Header.Get("Content-Type"), "application/grpc") { // grpcServer.ServeHTTP(w, r) // } else { // yourMux.ServeHTTP(w, r) // } // // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally // separate from grpc-go's HTTP/2 server. Performance and features may vary // between the two paths. ServeHTTP does not support some gRPC features // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL // and subject to change. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { st, err := transport.NewServerHandlerTransport(w, r) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if !s.addConn(st) { st.Close() return } defer s.removeConn(st) s.serveStreams(st) } // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. // If tracing is not enabled, it returns nil. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { tr, ok := trace.FromContext(stream.Context()) if !ok { return nil } trInfo = &traceInfo{ tr: tr, } trInfo.firstLine.client = false trInfo.firstLine.remoteAddr = st.RemoteAddr() if dl, ok := stream.Context().Deadline(); ok { trInfo.firstLine.deadline = dl.Sub(time.Now()) } return trInfo } func (s *Server) addConn(c io.Closer) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil || s.drain { return false } s.conns[c] = true return true } func (s *Server) removeConn(c io.Closer) { s.mu.Lock() defer s.mu.Unlock() if s.conns != nil { delete(s.conns, c) s.cv.Broadcast() } } func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { var ( cbuf *bytes.Buffer outPayload *stats.OutPayload ) if cp != nil { cbuf = new(bytes.Buffer) } if s.opts.statsHandler != nil { outPayload = &stats.OutPayload{} } hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) return err } if len(data) > s.opts.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize) } err = t.Write(stream, hdr, data, opts) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() s.opts.statsHandler.HandleRPC(stream.Context(), outPayload) } return err } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { begin := &stats.Begin{ BeginTime: time.Now(), } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) } sh.HandleRPC(stream.Context(), end) }() } if trInfo != nil { defer trInfo.tr.Finish() trInfo.firstLine.client = false trInfo.tr.LazyLog(&trInfo.firstLine, false) defer func() { if err != nil && err != io.EOF { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } }() } if s.opts.cp != nil { // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. stream.SetSendCompress(s.opts.cp.Type()) } p := &parser{r: stream} pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) if err == io.EOF { // The entire stream is done (for unary RPC only). return err } if err == io.ErrUnexpectedEOF { err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) } if err != nil { if st, ok := status.FromError(err); ok { if e := t.WriteStatus(stream, st); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } } else { switch st := err.(type) { case transport.ConnectionError: // Nothing to do here. case transport.StreamError: if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } default: panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) } } return err } if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { if st, ok := status.FromError(err); ok { if e := t.WriteStatus(stream, st); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } return err } if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } // TODO checkRecvPayload always return RPC error. Add a return here if necessary. } var inPayload *stats.InPayload if sh != nil { inPayload = &stats.InPayload{ RecvTime: time.Now(), } } df := func(v interface{}) error { if inPayload != nil { inPayload.WireLength = len(req) } if pf == compressionMade { var err error req, err = s.opts.dc.Do(bytes.NewReader(req)) if err != nil { return Errorf(codes.Internal, err.Error()) } } if len(req) > s.opts.maxReceiveMessageSize { // TODO: Revisit the error code. Currently keep it consistent with // java implementation. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize) } if err := s.opts.codec.Unmarshal(req, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } if inPayload != nil { inPayload.Payload = v inPayload.Data = req inPayload.Length = len(req) sh.HandleRPC(stream.Context(), inPayload) } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } return nil } reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) if appErr != nil { appStatus, ok := status.FromError(appErr) if !ok { // Convert appErr if it is not a grpc status error. appErr = status.Error(convertCode(appErr), appErr.Error()) appStatus, _ = status.FromError(appErr) } if trInfo != nil { trInfo.tr.LazyLog(stringer(appStatus.Message()), true) trInfo.tr.SetError() } if e := t.WriteStatus(stream, appStatus); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) } return appErr } if trInfo != nil { trInfo.tr.LazyLog(stringer("OK"), false) } opts := &transport.Options{ Last: true, Delay: false, } if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil { if err == io.EOF { // The entire stream is done (for unary RPC only). return err } if s, ok := status.FromError(err); ok { if e := t.WriteStatus(stream, s); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) } } else { switch st := err.(type) { case transport.ConnectionError: // Nothing to do here. case transport.StreamError: if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } default: panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) } } return err } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) } // TODO: Should we be logging if writing status failed here, like above? // Should the logging be in WriteStatus? Should we ignore the WriteStatus // error or allow the stats handler to see it? return t.WriteStatus(stream, status.New(codes.OK, "")) } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { begin := &stats.Begin{ BeginTime: time.Now(), } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) } sh.HandleRPC(stream.Context(), end) }() } if s.opts.cp != nil { stream.SetSendCompress(s.opts.cp.Type()) } ss := &serverStream{ t: t, s: stream, p: &parser{r: stream}, codec: s.opts.codec, cp: s.opts.cp, dc: s.opts.dc, maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, trInfo: trInfo, statsHandler: sh, } if ss.cp != nil { ss.cbuf = new(bytes.Buffer) } if trInfo != nil { trInfo.tr.LazyLog(&trInfo.firstLine, false) defer func() { ss.mu.Lock() if err != nil && err != io.EOF { ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) ss.trInfo.tr.SetError() } ss.trInfo.tr.Finish() ss.trInfo.tr = nil ss.mu.Unlock() }() } var appErr error var server interface{} if srv != nil { server = srv.server } if s.opts.streamInt == nil { appErr = sd.Handler(server, ss) } else { info := &StreamServerInfo{ FullMethod: stream.Method(), IsClientStream: sd.ClientStreams, IsServerStream: sd.ServerStreams, } appErr = s.opts.streamInt(server, ss, info, sd.Handler) } if appErr != nil { appStatus, ok := status.FromError(appErr) if !ok { switch err := appErr.(type) { case transport.StreamError: appStatus = status.New(err.Code, err.Desc) default: appStatus = status.New(convertCode(appErr), appErr.Error()) } appErr = appStatus.Err() } if trInfo != nil { ss.mu.Lock() ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) ss.trInfo.tr.SetError() ss.mu.Unlock() } t.WriteStatus(ss.s, appStatus) // TODO: Should we log an error from WriteStatus here and below? return appErr } if trInfo != nil { ss.mu.Lock() ss.trInfo.tr.LazyLog(stringer("OK"), false) ss.mu.Unlock() } return t.WriteStatus(ss.s, status.New(codes.OK, "")) } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } pos := strings.LastIndex(sm, "/") if pos == -1 { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) trInfo.tr.SetError() } errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() } return } service := sm[:pos] method := sm[pos+1:] srv, ok := s.m[service] if !ok { if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) return } if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) trInfo.tr.SetError() } errDesc := fmt.Sprintf("unknown service %v", service) if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() } return } // Unary RPC or Streaming RPC? if md, ok := srv.md[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.sd[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) trInfo.tr.SetError() } if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) return } errDesc := fmt.Sprintf("unknown method %v", method) if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() } } // Stop stops the gRPC server. It immediately closes all open // connections and listeners. // It cancels all active RPCs on the server side and the corresponding // pending RPCs on the client side will get notified by connection // errors. func (s *Server) Stop() { s.mu.Lock() listeners := s.lis s.lis = nil st := s.conns s.conns = nil // interrupt GracefulStop if Stop and GracefulStop are called concurrently. s.cv.Broadcast() s.mu.Unlock() for lis := range listeners { lis.Close() } for c := range st { c.Close() } s.mu.Lock() s.cancel() if s.events != nil { s.events.Finish() s.events = nil } s.mu.Unlock() } // GracefulStop stops the gRPC server gracefully. It stops the server from // accepting new connections and RPCs and blocks until all the pending RPCs are // finished. func (s *Server) GracefulStop() { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { return } for lis := range s.lis { lis.Close() } s.lis = nil s.cancel() if !s.drain { for c := range s.conns { c.(transport.ServerTransport).Drain() } s.drain = true } for len(s.conns) != 0 { s.cv.Wait() } s.conns = nil if s.events != nil { s.events.Finish() s.events = nil } } func init() { internal.TestingCloseConns = func(arg interface{}) { arg.(*Server).testingCloseConns() } internal.TestingUseHandlerImpl = func(arg interface{}) { arg.(*Server).opts.useHandlerImpl = true } } // testingCloseConns closes all existing transports but keeps s.lis // accepting new connections. func (s *Server) testingCloseConns() { s.mu.Lock() for c := range s.conns { c.Close() delete(s.conns, c) } s.mu.Unlock() } // SetHeader sets the header metadata. // When called multiple times, all the provided metadata will be merged. // All the metadata will be sent out when one of the following happens: // - grpc.SendHeader() is called; // - The first response is sent out; // - An RPC status is sent out (error or success). func SetHeader(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil } stream, ok := transport.StreamFromContext(ctx) if !ok { return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetHeader(md) } // SendHeader sends header metadata. It may be called at most once. // The provided md and headers set by SetHeader() will be sent. func SendHeader(ctx context.Context, md metadata.MD) error { stream, ok := transport.StreamFromContext(ctx) if !ok { return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } t := stream.ServerTransport() if t == nil { grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream) } if err := t.WriteHeader(stream, md); err != nil { return toRPCErr(err) } return nil } // SetTrailer sets the trailer metadata that will be sent when an RPC returns. // When called more than once, all the provided metadata will be merged. func SetTrailer(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil } stream, ok := transport.StreamFromContext(ctx) if !ok { return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetTrailer(md) }