diff --git a/fs/runtime/grpc/grpc_func.go b/fs/runtime/grpc/grpc_func.go index 9c2995a9..a206ddf0 100644 --- a/fs/runtime/grpc/grpc_func.go +++ b/fs/runtime/grpc/grpc_func.go @@ -25,7 +25,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "io" "log/slog" "net" "sync" @@ -33,14 +32,16 @@ import ( ) type GRPCFuncRuntime struct { - Name string - ctx context.Context - status *proto.FunctionStatus - readyCh chan error - input chan string - output chan string - stopFunc func() - log *slog.Logger + api.FunctionRuntime + Name string + ctx context.Context + status *proto.FunctionStatus + readyOnce sync.Once + readyCh chan error + input chan string + output chan string + stopFunc func() + log *slog.Logger } type Status int32 @@ -77,7 +78,7 @@ func (s *FSSReconcileServer) WaitForReady() <-chan struct{} { return s.readyCh } -func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer) error { +func (s *FSSReconcileServer) Reconcile(req *proto.ConnectRequest, stream proto.FSReconcile_ReconcileServer) error { s.connected.Do(func() { close(s.readyCh) }) @@ -89,29 +90,20 @@ func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer) atomic.StoreInt32(&s.status, int32(NotReady)) s.log.InfoContext(s.ctx, "grpc reconcile stream closed") }() - errCh := make(chan error) - go func() { - for { - newStatus, err := stream.Recv() - s.log.DebugContext(s.ctx, "received status update", slog.Any("status", newStatus)) - if err == io.EOF { - return - } - if err != nil { - errCh <- err - return - } - s.functionsMu.Lock() - instance, ok := s.functions[newStatus.Name] - if !ok { - s.functionsMu.Unlock() - s.log.ErrorContext(s.ctx, "receive non-exist function status update", slog.Any("name", newStatus.Name)) - continue - } - s.functionsMu.Unlock() - instance.Update(newStatus) + // Sync all exiting function status to the newly connected reconcile client + s.functionsMu.Lock() + statusList := make([]*proto.FunctionStatus, 0, len(s.functions)) + for _, v := range s.functions { + statusList = append(statusList, v.status) + } + s.functionsMu.Unlock() + for _, v := range statusList { + err := stream.Send(v) + if err != nil { + s.log.ErrorContext(stream.Context(), "failed to send status update", slog.Any("status", v)) + // Continue to send the next status update. } - }() + } for { select { case status := <-s.reconcile: @@ -125,18 +117,39 @@ func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer) return nil case <-s.ctx.Done(): return nil - case e := <-errCh: - return e } } } +func (s *FSSReconcileServer) UpdateStatus(ctx context.Context, newStatus *proto.FunctionStatus) (*proto.Response, error) { + s.log.DebugContext(s.ctx, "received status update", slog.Any("status", newStatus)) + s.functionsMu.Lock() + instance, ok := s.functions[newStatus.Name] + if !ok { + s.functionsMu.Unlock() + s.log.ErrorContext(s.ctx, "receive non-exist function status update", slog.Any("name", newStatus.Name)) + return &proto.Response{ + Status: proto.Response_ERROR, + Message: common.OptionalStr("function not found"), + }, nil + } + s.functionsMu.Unlock() + instance.Update(newStatus) + return &proto.Response{ + Status: proto.Response_OK, + }, nil +} + func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { name := instance.Definition().Name log := instance.Logger().With( slog.String("component", "grpc-runtime"), ) + go func() { + <-instance.Context().Done() + s.removeFunction(name) + }() runtime := &GRPCFuncRuntime{ Name: name, readyCh: make(chan error), @@ -147,7 +160,7 @@ func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) ( Status: proto.FunctionStatus_CREATING, }, ctx: instance.Context(), - stopFunc: func() { + stopFunc: func() { // TODO: remove it, we should use instance.ctx to control the lifecycle s.removeFunction(name) }, log: log, @@ -228,43 +241,22 @@ func NewFunctionServerImpl(s *FSSReconcileServer) *FunctionServerImpl { } } -func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error { - md, ok := metadata.FromIncomingContext(stream.Context()) - if !ok { - return fmt.Errorf("failed to get metadata") - } - instance, err := f.reconcileSvr.getFunc(md["name"][0]) +func (f *FunctionServerImpl) Process(req *proto.FunctionProcessRequest, stream proto.Function_ProcessServer) error { + runtime, err := f.reconcileSvr.getFunc(req.Name) if err != nil { return err } - log := instance.log + log := runtime.log log.InfoContext(stream.Context(), "Start processing events using GRPC") - instance.readyCh <- err - defer func() { - instance.Stop() - }() + runtime.readyOnce.Do(func() { + runtime.readyCh <- err + }) errCh := make(chan error) - go func() { - defer log.InfoContext(stream.Context(), "Stop processing events using GRPC") - logCounter := common.LogCounter() - for { - event, err := stream.Recv() - log.DebugContext(stream.Context(), "received event", slog.Any("count", logCounter)) - if err == io.EOF { - return - } - if err != nil { - errCh <- err - return - } - instance.output <- event.Payload - } - }() logCounter := common.LogCounter() for { select { - case payload := <-instance.input: + case payload := <-runtime.input: log.DebugContext(stream.Context(), "sending event", slog.Any("count", logCounter)) err := stream.Send(&proto.Event{Payload: payload}) if err != nil { @@ -273,15 +265,33 @@ func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error } case <-stream.Context().Done(): return nil - case <-instance.ctx.Done(): + case <-runtime.ctx.Done(): return nil case e := <-errCh: - log.ErrorContext(stream.Context(), "error processing event", slog.Any("error", e)) return e } } } +func (f *FunctionServerImpl) Output(ctx context.Context, e *proto.Event) (*proto.Response, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, fmt.Errorf("failed to get metadata") + } + if _, ok := md["name"]; !ok || len(md["name"]) == 0 { + return nil, fmt.Errorf("the metadata doesn't contain the function name") + } + runtime, err := f.reconcileSvr.getFunc(md["name"][0]) + if err != nil { + return nil, err + } + runtime.log.DebugContext(ctx, "received event") + runtime.output <- e.Payload + return &proto.Response{ + Status: proto.Response_OK, + }, nil +} + func StartGRPCServer(f *FSSReconcileServer, addr string) (*grpc.Server, error) { lis, err := net.Listen("tcp", addr) if err != nil { diff --git a/fs/runtime/grpc/mock_grpc_func_test.go b/fs/runtime/grpc/mock_grpc_func_test.go index 40dbdf84..7194c5c1 100644 --- a/fs/runtime/grpc/mock_grpc_func_test.go +++ b/fs/runtime/grpc/mock_grpc_func_test.go @@ -36,7 +36,7 @@ func StartMockGRPCFunc(t *testing.T, addr string) { } client := proto.NewFSReconcileClient(conn) - stream, err := client.Reconcile(context.Background()) + stream, err := client.Reconcile(context.Background(), &proto.ConnectRequest{}) if err != nil { t.Errorf("failed to get process stream: %v", err) return @@ -70,14 +70,19 @@ func StartMockGRPCFunc(t *testing.T, addr string) { } t.Logf("client received status: %v", s) s.Status = proto.FunctionStatus_RUNNING - err = stream.Send(s) + res, err := client.UpdateStatus(context.Background(), s) if err != nil { t.Errorf("failed to send: %v", err) return } + if res.GetStatus() != proto.Response_OK { + t.Errorf("failed to update status: %s", res.GetMessage()) + } go func() { ctx := metadata.AppendToOutgoingContext(context.Background(), "name", s.Name) - processStream, err := funcCli.Process(ctx) + processStream, err := funcCli.Process(ctx, &proto.FunctionProcessRequest{ + Name: s.Name, + }) if err != nil { t.Errorf("failed to get process stream: %v", err) return @@ -90,11 +95,15 @@ func StartMockGRPCFunc(t *testing.T, addr string) { } t.Logf("client received event: %v", event) event.Payload += "!" - err = processStream.Send(event) + res, err := funcCli.Output(ctx, event) if err != nil { t.Errorf("failed to send event: %v", err) return } + if res.GetStatus() != proto.Response_OK { + t.Errorf("failed to send event: %s", res.GetMessage()) + return + } } }() } diff --git a/fs/runtime/grpc/proto/grpc_func.pb.go b/fs/runtime/grpc/proto/grpc_func.pb.go index 9558586d..f7b41768 100644 --- a/fs/runtime/grpc/proto/grpc_func.pb.go +++ b/fs/runtime/grpc/proto/grpc_func.pb.go @@ -133,7 +133,7 @@ func (x FunctionStatus_Status) Number() protoreflect.EnumNumber { // Deprecated: Use FunctionStatus_Status.Descriptor instead. func (FunctionStatus_Status) EnumDescriptor() ([]byte, []int) { - return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{3, 0} + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{4, 0} } // The request message for the Process method. @@ -295,6 +295,44 @@ func (x *SetStateRequest) GetValue() string { return "" } +type ConnectRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ConnectRequest) Reset() { + *x = ConnectRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConnectRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConnectRequest) ProtoMessage() {} + +func (x *ConnectRequest) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConnectRequest.ProtoReflect.Descriptor instead. +func (*ConnectRequest) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{3} +} + type FunctionStatus struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -308,7 +346,7 @@ type FunctionStatus struct { func (x *FunctionStatus) Reset() { *x = FunctionStatus{} if protoimpl.UnsafeEnabled { - mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -321,7 +359,7 @@ func (x *FunctionStatus) String() string { func (*FunctionStatus) ProtoMessage() {} func (x *FunctionStatus) ProtoReflect() protoreflect.Message { - mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -334,7 +372,7 @@ func (x *FunctionStatus) ProtoReflect() protoreflect.Message { // Deprecated: Use FunctionStatus.ProtoReflect.Descriptor instead. func (*FunctionStatus) Descriptor() ([]byte, []int) { - return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{3} + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{4} } func (x *FunctionStatus) GetName() string { @@ -358,6 +396,53 @@ func (x *FunctionStatus) GetDetails() string { return "" } +type FunctionProcessRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (x *FunctionProcessRequest) Reset() { + *x = FunctionProcessRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FunctionProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FunctionProcessRequest) ProtoMessage() {} + +func (x *FunctionProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FunctionProcessRequest.ProtoReflect.Descriptor instead. +func (*FunctionProcessRequest) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{5} +} + +func (x *FunctionProcessRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + var File_fs_runtime_grpc_proto_grpc_func_proto protoreflect.FileDescriptor var file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc = []byte{ @@ -378,32 +463,40 @@ var file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc = []byte{ 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xd3, 0x01, 0x0a, 0x0e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x06, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x66, - 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, - 0x88, 0x01, 0x01, 0x22, 0x4a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, - 0x08, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x52, - 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x4c, 0x45, - 0x54, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, - 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x42, - 0x0a, 0x0a, 0x08, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x32, 0x52, 0x0a, 0x0b, 0x46, - 0x53, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x43, 0x0a, 0x09, 0x52, 0x65, - 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, - 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x1a, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, - 0x76, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2f, 0x0a, 0x07, 0x50, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, - 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, - 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x08, - 0x53, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, - 0x6e, 0x63, 0x2e, 0x53, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xd3, 0x01, 0x0a, 0x0e, 0x46, 0x75, 0x6e, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x36, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, + 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, + 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, + 0x6c, 0x73, 0x88, 0x01, 0x01, 0x22, 0x4a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x0c, 0x0a, 0x08, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, + 0x4c, 0x45, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, + 0x54, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, + 0x04, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x22, 0x2c, 0x0a, + 0x16, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x8e, 0x01, 0x0a, 0x0b, + 0x46, 0x53, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x41, 0x0a, 0x09, 0x52, + 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, + 0x6e, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3c, + 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x17, + 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x1a, 0x11, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, + 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x32, 0x79, 0x0a, 0x08, + 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x63, + 0x65, 0x73, 0x73, 0x12, 0x1f, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, + 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x06, 0x4f, 0x75, 0x74, 0x70, + 0x75, 0x74, 0x12, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x1a, 0x11, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x19, 0x5a, 0x17, 0x66, 0x73, 0x2f, 0x66, 0x75, 0x6e, 0x63, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, @@ -422,26 +515,30 @@ func file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP() []byte { } var file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_fs_runtime_grpc_proto_grpc_func_proto_goTypes = []interface{}{ - (Response_Status)(0), // 0: fs_func.Response.Status - (FunctionStatus_Status)(0), // 1: fs_func.FunctionStatus.Status - (*Event)(nil), // 2: fs_func.Event - (*Response)(nil), // 3: fs_func.Response - (*SetStateRequest)(nil), // 4: fs_func.SetStateRequest - (*FunctionStatus)(nil), // 5: fs_func.FunctionStatus + (Response_Status)(0), // 0: fs_func.Response.Status + (FunctionStatus_Status)(0), // 1: fs_func.FunctionStatus.Status + (*Event)(nil), // 2: fs_func.Event + (*Response)(nil), // 3: fs_func.Response + (*SetStateRequest)(nil), // 4: fs_func.SetStateRequest + (*ConnectRequest)(nil), // 5: fs_func.ConnectRequest + (*FunctionStatus)(nil), // 6: fs_func.FunctionStatus + (*FunctionProcessRequest)(nil), // 7: fs_func.FunctionProcessRequest } var file_fs_runtime_grpc_proto_grpc_func_proto_depIdxs = []int32{ 0, // 0: fs_func.Response.status:type_name -> fs_func.Response.Status 1, // 1: fs_func.FunctionStatus.status:type_name -> fs_func.FunctionStatus.Status - 5, // 2: fs_func.FSReconcile.Reconcile:input_type -> fs_func.FunctionStatus - 2, // 3: fs_func.Function.Process:input_type -> fs_func.Event - 4, // 4: fs_func.Function.SetState:input_type -> fs_func.SetStateRequest - 5, // 5: fs_func.FSReconcile.Reconcile:output_type -> fs_func.FunctionStatus - 2, // 6: fs_func.Function.Process:output_type -> fs_func.Event - 3, // 7: fs_func.Function.SetState:output_type -> fs_func.Response - 5, // [5:8] is the sub-list for method output_type - 2, // [2:5] is the sub-list for method input_type + 5, // 2: fs_func.FSReconcile.Reconcile:input_type -> fs_func.ConnectRequest + 6, // 3: fs_func.FSReconcile.UpdateStatus:input_type -> fs_func.FunctionStatus + 7, // 4: fs_func.Function.Process:input_type -> fs_func.FunctionProcessRequest + 2, // 5: fs_func.Function.Output:input_type -> fs_func.Event + 6, // 6: fs_func.FSReconcile.Reconcile:output_type -> fs_func.FunctionStatus + 3, // 7: fs_func.FSReconcile.UpdateStatus:output_type -> fs_func.Response + 2, // 8: fs_func.Function.Process:output_type -> fs_func.Event + 3, // 9: fs_func.Function.Output:output_type -> fs_func.Response + 6, // [6:10] is the sub-list for method output_type + 2, // [2:6] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name @@ -490,6 +587,18 @@ func file_fs_runtime_grpc_proto_grpc_func_proto_init() { } } file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConnectRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FunctionStatus); i { case 0: return &v.state @@ -501,16 +610,28 @@ func file_fs_runtime_grpc_proto_grpc_func_proto_init() { return nil } } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FunctionProcessRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[1].OneofWrappers = []interface{}{} - file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3].OneofWrappers = []interface{}{} + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[4].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc, NumEnums: 2, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 2, }, diff --git a/fs/runtime/grpc/proto/grpc_func.proto b/fs/runtime/grpc/proto/grpc_func.proto index ef6ea379..41c10778 100644 --- a/fs/runtime/grpc/proto/grpc_func.proto +++ b/fs/runtime/grpc/proto/grpc_func.proto @@ -38,6 +38,10 @@ message SetStateRequest { string value = 2; } +message ConnectRequest { + +} + message FunctionStatus { string name = 1; enum Status { @@ -52,10 +56,15 @@ message FunctionStatus { } service FSReconcile { - rpc Reconcile(stream FunctionStatus) returns (stream FunctionStatus) {} + rpc Reconcile(ConnectRequest) returns (stream FunctionStatus) {} + rpc UpdateStatus(FunctionStatus) returns (Response) {} +} + +message FunctionProcessRequest { + string name = 1; } service Function { - rpc Process(stream Event) returns (stream Event) {} - rpc SetState(SetStateRequest) returns (Response) {} + rpc Process(FunctionProcessRequest) returns (stream Event) {} + rpc Output(Event) returns (Response) {} } diff --git a/fs/runtime/grpc/proto/grpc_func_grpc.pb.go b/fs/runtime/grpc/proto/grpc_func_grpc.pb.go index b6fcc726..5fd62b97 100644 --- a/fs/runtime/grpc/proto/grpc_func_grpc.pb.go +++ b/fs/runtime/grpc/proto/grpc_func_grpc.pb.go @@ -18,7 +18,8 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type FSReconcileClient interface { - Reconcile(ctx context.Context, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) + Reconcile(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) + UpdateStatus(ctx context.Context, in *FunctionStatus, opts ...grpc.CallOption) (*Response, error) } type fSReconcileClient struct { @@ -29,17 +30,22 @@ func NewFSReconcileClient(cc grpc.ClientConnInterface) FSReconcileClient { return &fSReconcileClient{cc} } -func (c *fSReconcileClient) Reconcile(ctx context.Context, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) { +func (c *fSReconcileClient) Reconcile(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) { stream, err := c.cc.NewStream(ctx, &FSReconcile_ServiceDesc.Streams[0], "/fs_func.FSReconcile/Reconcile", opts...) if err != nil { return nil, err } x := &fSReconcileReconcileClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } return x, nil } type FSReconcile_ReconcileClient interface { - Send(*FunctionStatus) error Recv() (*FunctionStatus, error) grpc.ClientStream } @@ -48,10 +54,6 @@ type fSReconcileReconcileClient struct { grpc.ClientStream } -func (x *fSReconcileReconcileClient) Send(m *FunctionStatus) error { - return x.ClientStream.SendMsg(m) -} - func (x *fSReconcileReconcileClient) Recv() (*FunctionStatus, error) { m := new(FunctionStatus) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -60,11 +62,21 @@ func (x *fSReconcileReconcileClient) Recv() (*FunctionStatus, error) { return m, nil } +func (c *fSReconcileClient) UpdateStatus(ctx context.Context, in *FunctionStatus, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fs_func.FSReconcile/UpdateStatus", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // FSReconcileServer is the server API for FSReconcile service. // All implementations must embed UnimplementedFSReconcileServer // for forward compatibility type FSReconcileServer interface { - Reconcile(FSReconcile_ReconcileServer) error + Reconcile(*ConnectRequest, FSReconcile_ReconcileServer) error + UpdateStatus(context.Context, *FunctionStatus) (*Response, error) mustEmbedUnimplementedFSReconcileServer() } @@ -72,9 +84,12 @@ type FSReconcileServer interface { type UnimplementedFSReconcileServer struct { } -func (UnimplementedFSReconcileServer) Reconcile(FSReconcile_ReconcileServer) error { +func (UnimplementedFSReconcileServer) Reconcile(*ConnectRequest, FSReconcile_ReconcileServer) error { return status.Errorf(codes.Unimplemented, "method Reconcile not implemented") } +func (UnimplementedFSReconcileServer) UpdateStatus(context.Context, *FunctionStatus) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateStatus not implemented") +} func (UnimplementedFSReconcileServer) mustEmbedUnimplementedFSReconcileServer() {} // UnsafeFSReconcileServer may be embedded to opt out of forward compatibility for this service. @@ -89,12 +104,15 @@ func RegisterFSReconcileServer(s grpc.ServiceRegistrar, srv FSReconcileServer) { } func _FSReconcile_Reconcile_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(FSReconcileServer).Reconcile(&fSReconcileReconcileServer{stream}) + m := new(ConnectRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(FSReconcileServer).Reconcile(m, &fSReconcileReconcileServer{stream}) } type FSReconcile_ReconcileServer interface { Send(*FunctionStatus) error - Recv() (*FunctionStatus, error) grpc.ServerStream } @@ -106,12 +124,22 @@ func (x *fSReconcileReconcileServer) Send(m *FunctionStatus) error { return x.ServerStream.SendMsg(m) } -func (x *fSReconcileReconcileServer) Recv() (*FunctionStatus, error) { - m := new(FunctionStatus) - if err := x.ServerStream.RecvMsg(m); err != nil { +func _FSReconcile_UpdateStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FunctionStatus) + if err := dec(in); err != nil { return nil, err } - return m, nil + if interceptor == nil { + return srv.(FSReconcileServer).UpdateStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fs_func.FSReconcile/UpdateStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FSReconcileServer).UpdateStatus(ctx, req.(*FunctionStatus)) + } + return interceptor(ctx, in, info, handler) } // FSReconcile_ServiceDesc is the grpc.ServiceDesc for FSReconcile service. @@ -120,13 +148,17 @@ func (x *fSReconcileReconcileServer) Recv() (*FunctionStatus, error) { var FSReconcile_ServiceDesc = grpc.ServiceDesc{ ServiceName: "fs_func.FSReconcile", HandlerType: (*FSReconcileServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "UpdateStatus", + Handler: _FSReconcile_UpdateStatus_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "Reconcile", Handler: _FSReconcile_Reconcile_Handler, ServerStreams: true, - ClientStreams: true, }, }, Metadata: "fs/runtime/grpc/proto/grpc_func.proto", @@ -136,8 +168,8 @@ var FSReconcile_ServiceDesc = grpc.ServiceDesc{ // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type FunctionClient interface { - Process(ctx context.Context, opts ...grpc.CallOption) (Function_ProcessClient, error) - SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*Response, error) + Process(ctx context.Context, in *FunctionProcessRequest, opts ...grpc.CallOption) (Function_ProcessClient, error) + Output(ctx context.Context, in *Event, opts ...grpc.CallOption) (*Response, error) } type functionClient struct { @@ -148,17 +180,22 @@ func NewFunctionClient(cc grpc.ClientConnInterface) FunctionClient { return &functionClient{cc} } -func (c *functionClient) Process(ctx context.Context, opts ...grpc.CallOption) (Function_ProcessClient, error) { +func (c *functionClient) Process(ctx context.Context, in *FunctionProcessRequest, opts ...grpc.CallOption) (Function_ProcessClient, error) { stream, err := c.cc.NewStream(ctx, &Function_ServiceDesc.Streams[0], "/fs_func.Function/Process", opts...) if err != nil { return nil, err } x := &functionProcessClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } return x, nil } type Function_ProcessClient interface { - Send(*Event) error Recv() (*Event, error) grpc.ClientStream } @@ -167,10 +204,6 @@ type functionProcessClient struct { grpc.ClientStream } -func (x *functionProcessClient) Send(m *Event) error { - return x.ClientStream.SendMsg(m) -} - func (x *functionProcessClient) Recv() (*Event, error) { m := new(Event) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -179,9 +212,9 @@ func (x *functionProcessClient) Recv() (*Event, error) { return m, nil } -func (c *functionClient) SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*Response, error) { +func (c *functionClient) Output(ctx context.Context, in *Event, opts ...grpc.CallOption) (*Response, error) { out := new(Response) - err := c.cc.Invoke(ctx, "/fs_func.Function/SetState", in, out, opts...) + err := c.cc.Invoke(ctx, "/fs_func.Function/Output", in, out, opts...) if err != nil { return nil, err } @@ -192,8 +225,8 @@ func (c *functionClient) SetState(ctx context.Context, in *SetStateRequest, opts // All implementations must embed UnimplementedFunctionServer // for forward compatibility type FunctionServer interface { - Process(Function_ProcessServer) error - SetState(context.Context, *SetStateRequest) (*Response, error) + Process(*FunctionProcessRequest, Function_ProcessServer) error + Output(context.Context, *Event) (*Response, error) mustEmbedUnimplementedFunctionServer() } @@ -201,11 +234,11 @@ type FunctionServer interface { type UnimplementedFunctionServer struct { } -func (UnimplementedFunctionServer) Process(Function_ProcessServer) error { +func (UnimplementedFunctionServer) Process(*FunctionProcessRequest, Function_ProcessServer) error { return status.Errorf(codes.Unimplemented, "method Process not implemented") } -func (UnimplementedFunctionServer) SetState(context.Context, *SetStateRequest) (*Response, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetState not implemented") +func (UnimplementedFunctionServer) Output(context.Context, *Event) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Output not implemented") } func (UnimplementedFunctionServer) mustEmbedUnimplementedFunctionServer() {} @@ -221,12 +254,15 @@ func RegisterFunctionServer(s grpc.ServiceRegistrar, srv FunctionServer) { } func _Function_Process_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(FunctionServer).Process(&functionProcessServer{stream}) + m := new(FunctionProcessRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(FunctionServer).Process(m, &functionProcessServer{stream}) } type Function_ProcessServer interface { Send(*Event) error - Recv() (*Event, error) grpc.ServerStream } @@ -238,28 +274,20 @@ func (x *functionProcessServer) Send(m *Event) error { return x.ServerStream.SendMsg(m) } -func (x *functionProcessServer) Recv() (*Event, error) { - m := new(Event) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func _Function_SetState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SetStateRequest) +func _Function_Output_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Event) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(FunctionServer).SetState(ctx, in) + return srv.(FunctionServer).Output(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/fs_func.Function/SetState", + FullMethod: "/fs_func.Function/Output", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FunctionServer).SetState(ctx, req.(*SetStateRequest)) + return srv.(FunctionServer).Output(ctx, req.(*Event)) } return interceptor(ctx, in, info, handler) } @@ -272,8 +300,8 @@ var Function_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*FunctionServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "SetState", - Handler: _Function_SetState_Handler, + MethodName: "Output", + Handler: _Function_Output_Handler, }, }, Streams: []grpc.StreamDesc{ @@ -281,7 +309,6 @@ var Function_ServiceDesc = grpc.ServiceDesc{ StreamName: "Process", Handler: _Function_Process_Handler, ServerStreams: true, - ClientStreams: true, }, }, Metadata: "fs/runtime/grpc/proto/grpc_func.proto",