From 4bbab6d2a923805d7cf93519bf1ae8cb162c13ed Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 22:43:38 +0800 Subject: [PATCH 1/5] feat: add http source tube Signed-off-by: Zike Yang --- fs/contube/contube.go | 4 +- fs/contube/http.go | 116 ++++++++++++++++++++++++++++++++++++++++ fs/contube/http_test.go | 35 ++++++++++++ fs/instance_impl.go | 2 +- go.mod | 3 ++ 5 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 fs/contube/http.go create mode 100644 fs/contube/http_test.go diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 8119b392..9dc9fe5a 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -74,8 +74,8 @@ func (c *SinkQueueConfig) ToConfigMap() ConfigMap { type ConfigMap map[string]interface{} -// Merge merges multiple ConfigMap into one -func Merge(configs ...ConfigMap) ConfigMap { +// MergeConfig merges multiple ConfigMap into one +func MergeConfig(configs ...ConfigMap) ConfigMap { result := ConfigMap{} for _, config := range configs { for k, v := range config { diff --git a/fs/contube/http.go b/fs/contube/http.go new file mode 100644 index 00000000..02ca6414 --- /dev/null +++ b/fs/contube/http.go @@ -0,0 +1,116 @@ +package contube + +import ( + "github.com/pkg/errors" + "golang.org/x/net/context" + "sync" + "sync/atomic" +) + +type state int + +const ( + EndpointKey = "endpoint" + + stateReady state = iota + stateClosed state = iota +) + +var ( + ErrEndpointNotFound = errors.New("endpoint not found") + ErrEndpointClosed = errors.New("endpoint closed") + ErrorEndpointAlreadyExists = errors.New("endpoint already exists") +) + +type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error + +type endpointHandler struct { + ctx context.Context + s atomic.Value + handler EndpointHandler + c chan Record +} + +type HttpTubeFactory struct { + TubeFactory + ctx context.Context + mu sync.RWMutex + endpoints map[string]*endpointHandler +} + +func NewHttpTubeFactory(ctx context.Context) TubeFactory { + return &HttpTubeFactory{ + ctx: ctx, + endpoints: make(map[string]*endpointHandler), + } +} + +type httpSourceTubeConfig struct { + endpoint string +} + +func (c ConfigMap) toHttpSourceTubeConfig() (*httpSourceTubeConfig, error) { + endpoint, ok := c[EndpointKey].(string) + if !ok { + return nil, ErrEndpointNotFound + } + return &httpSourceTubeConfig{ + endpoint: endpoint, + }, nil +} + +func (f *HttpTubeFactory) Handle(ctx context.Context, endpoint string, payload []byte) error { + f.mu.RLock() + e, ok := f.endpoints[endpoint] + if !ok { + f.mu.RUnlock() + return ErrEndpointNotFound + } + f.mu.RUnlock() + if e.s.Load() == stateClosed { + return ErrEndpointClosed + } + select { + case e.c <- NewRecordImpl(payload, func() {}): + return nil + case <-ctx.Done(): + return ctx.Err() + case <-e.ctx.Done(): + return ErrEndpointClosed + } +} + +func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error) { + c, err := config.toHttpSourceTubeConfig() + if err != nil { + return nil, err + } + result := make(chan Record, 10) + f.mu.Lock() + defer f.mu.Unlock() + if _, ok := f.endpoints[c.endpoint]; ok { + return nil, ErrorEndpointAlreadyExists + } + var s atomic.Value + s.Store(stateReady) + handlerCtx, cancel := context.WithCancel(f.ctx) + e := &endpointHandler{ + c: result, + s: s, + ctx: handlerCtx, + } + f.endpoints[c.endpoint] = e + go func() { + <-ctx.Done() + cancel() + close(result) + f.mu.Lock() + defer f.mu.Unlock() + delete(f.endpoints, c.endpoint) + }() + return result, nil +} + +func (f *HttpTubeFactory) NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error) { + return nil, errors.New("http tube factory does not support sink tube") +} diff --git a/fs/contube/http_test.go b/fs/contube/http_test.go new file mode 100644 index 00000000..81a5cb5f --- /dev/null +++ b/fs/contube/http_test.go @@ -0,0 +1,35 @@ +package contube + +import ( + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "testing" +) + +func TestHttpTubeHandleRecord(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + f := NewHttpTubeFactory(ctx).(*HttpTubeFactory) + + endpoint := "test" + err := f.Handle(ctx, "test", []byte("test")) + assert.ErrorIs(t, err, ErrEndpointNotFound) + + config := make(ConfigMap) + config[EndpointKey] = endpoint + source, err := f.NewSourceTube(ctx, config) + assert.NoError(t, err) + _, err = f.NewSourceTube(ctx, config) + assert.ErrorIs(t, err, ErrorEndpointAlreadyExists) + + err = f.Handle(ctx, endpoint, []byte("test")) + assert.Nil(t, err) + + record := <-source + assert.Equal(t, "test", string(record.GetPayload())) + + cancel() + + assert.Nil(t, <-source) + err = f.Handle(ctx, endpoint, []byte("test")) + assert.Error(t, ErrEndpointNotFound) +} diff --git a/fs/instance_impl.go b/fs/instance_impl.go index 92f26303..d5e1f648 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -78,7 +78,7 @@ func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFact } getTubeConfig := func(config contube.ConfigMap, tubeConfig *model.TubeConfig) contube.ConfigMap { if tubeConfig != nil && tubeConfig.Config != nil { - return contube.Merge(config, tubeConfig.Config) + return contube.MergeConfig(config, tubeConfig.Config) } return config } diff --git a/go.mod b/go.mod index 542c8cdc..f4cb8d06 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 + github.com/stretchr/testify v1.8.4 github.com/tetratelabs/wazero v1.6.0 golang.org/x/net v0.21.0 golang.org/x/time v0.5.0 @@ -26,6 +27,7 @@ require ( github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/danieljoos/wincred v1.2.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/frankban/quicktest v1.14.6 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -47,6 +49,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect From 8ded538635bd1e243b0f437634df521cc92502fd Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 23:35:39 +0800 Subject: [PATCH 2/5] feat: add server endpoint for http tube Signed-off-by: Zike Yang --- common/constants.go | 1 + fs/contube/http.go | 29 ++++++++++++++ server/server.go | 35 ++++++++++++++--- server/server_test.go | 89 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 143 insertions(+), 11 deletions(-) diff --git a/common/constants.go b/common/constants.go index 2003e0a4..97e12c52 100644 --- a/common/constants.go +++ b/common/constants.go @@ -19,6 +19,7 @@ package common const ( PulsarTubeType = "pulsar" MemoryTubeType = "memory" + HttpTubeType = "http" DefaultAddr = "localhost:7300" DefaultPulsarURL = "pulsar://localhost:6650" diff --git a/fs/contube/http.go b/fs/contube/http.go index 02ca6414..6fe4b46a 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -3,6 +3,9 @@ package contube import ( "github.com/pkg/errors" "golang.org/x/net/context" + "io" + "log/slog" + "net/http" "sync" "sync/atomic" ) @@ -114,3 +117,29 @@ func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) ( func (f *HttpTubeFactory) NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error) { return nil, errors.New("http tube factory does not support sink tube") } + +func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + endpoint, err := getEndpoint(r) + if err != nil { + logger.Error("Failed to get endpoint", "error", err) + http.Error(w, errors.Wrap(err, "Failed to get endpoint").Error(), http.StatusBadRequest) + return + } + log := logger.With(slog.String("endpoint", endpoint), slog.String("component", "http-tube")) + log.Info("Handle records from http request") + content, err := io.ReadAll(r.Body) + if err != nil { + log.Error("Failed to read body", "error", err) + http.Error(w, errors.Wrap(err, "Failed to read body").Error(), http.StatusBadRequest) + return + } + err = f.Handle(r.Context(), endpoint, content) + if err != nil { + log.Error("Failed to handle record", "error", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.Info("Handled records from http request") + } +} diff --git a/server/server.go b/server/server.go index 28cb1cd8..a997909a 100644 --- a/server/server.go +++ b/server/server.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" "io" "log/slog" + "net" "net/http" "net/url" "sync/atomic" @@ -43,7 +44,9 @@ type Server struct { } type serverOptions struct { - manager *fs.FunctionManager + httpListener net.Listener + manager *fs.FunctionManager + httpTube *contube.HttpTubeFactory } type ServerOption interface { @@ -63,7 +66,16 @@ func WithFunctionManager(manager *fs.FunctionManager) ServerOption { }) } -func newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, error) { +// WithHttpListener sets the listener for the HTTP server. +// If not set, the server will listen on the Config.ListenAddr. +func WithHttpListener(listener net.Listener) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.httpListener = listener + return o, nil + }) +} + +func (s *serverOptions) newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, error) { var tubeFactory contube.TubeFactory var err error switch config.TubeType { @@ -77,9 +89,10 @@ func newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, erro case common.MemoryTubeType: tubeFactory = contube.NewMemoryQueueFactory(context.Background()) } - + s.httpTube = contube.NewHttpTubeFactory(context.Background()).(*contube.HttpTubeFactory) manager, err := fs.NewFunctionManager( fs.WithDefaultTubeFactory(tubeFactory), + fs.WithTubeFactory("http", s.httpTube), ) if err != nil { return nil, errors.Wrap(err, "failed to create default function manager") @@ -96,7 +109,7 @@ func NewServer(config *common.Config, opts ...ServerOption) (*Server, error) { } } if options.manager == nil { - manager, err := newDefaultFunctionManager(config) + manager, err := options.newDefaultFunctionManager(config) if err != nil { return nil, err } @@ -244,13 +257,25 @@ func (s *Server) startRESTHandlers() error { log.Info("Consumed event from queue") }).Methods("GET") + r.HandleFunc("/api/v1/http-tube/{endpoint}", s.options.httpTube.GetHandleFunc(func(r *http.Request) (string, error) { + e, ok := mux.Vars(r)["endpoint"] + if !ok { + return "", errors.New("endpoint not found") + } + return e, nil + }, s.log)).Methods("POST") + httpSvr := &http.Server{ Addr: s.config.ListenAddr, Handler: r, } s.httpSvr.Store(httpSvr) - return httpSvr.ListenAndServe() + if s.options.httpListener != nil { + return httpSvr.Serve(s.options.httpListener) + } else { + return httpSvr.ListenAndServe() + } } func (s *Server) WaitForReady(ctx context.Context) <-chan struct{} { diff --git a/server/server_test.go b/server/server_test.go index e2e01a9a..d5ec4602 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -17,32 +17,53 @@ package server import ( + "bytes" "context" "encoding/json" + "fmt" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/tests" + "github.com/stretchr/testify/assert" "math/rand" + "net" + "net/http" "strconv" "testing" ) -func TestStandaloneBasicFunction(t *testing.T) { +func getListener(t *testing.T) net.Listener { + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + t.Logf("Listening on %s\n", ln.Addr().String()) + return ln +} +func startStandaloneSvr(t *testing.T, ctx context.Context) (*Server, string) { conf := &common.Config{ - ListenAddr: "localhost:7301", - TubeType: common.MemoryTubeType, + TubeType: common.MemoryTubeType, } - s, err := NewServer(conf) + ln := getListener(t) + s, err := NewServer(conf, WithHttpListener(ln)) if err != nil { t.Fatal(err) } svrCtx, svrCancel := context.WithCancel(context.Background()) go s.Run(svrCtx) - defer func() { + go func() { + <-ctx.Done() svrCancel() }() + return s, fmt.Sprintf("http://%s", ln.Addr()) +} + +func TestStandaloneBasicFunction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := startStandaloneSvr(t, ctx) inputTopic := "test-input-" + strconv.Itoa(rand.Int()) outputTopic := "test-output-" + strconv.Itoa(rand.Int()) @@ -58,7 +79,7 @@ func TestStandaloneBasicFunction(t *testing.T) { Name: "test-func", Replicas: 1, } - err = s.options.manager.StartFunction(funcConf) + err := s.options.manager.StartFunction(funcConf) if err != nil { t.Fatal(err) } @@ -93,3 +114,59 @@ func TestStandaloneBasicFunction(t *testing.T) { return } } + +func TestHttpTube(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, httpAddr := startStandaloneSvr(t, ctx) + + endpoint := "test-endpoint" + funcConf := &model.Function{ + Runtime: &model.RuntimeConfig{ + Config: map[string]interface{}{ + common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", + }, + }, + Source: &model.TubeConfig{ + Type: common.OptionalStr(common.HttpTubeType), + Config: map[string]interface{}{ + contube.EndpointKey: endpoint, + }, + }, + Inputs: []string{}, + Output: "output", + Name: "test-func", + Replicas: 1, + } + + err := s.options.manager.StartFunction(funcConf) + assert.Nil(t, err) + + p := &tests.Person{ + Name: "rbt", + Money: 0, + } + jsonBytes, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + + _, err = http.Post(httpAddr+"/api/v1/http-tube/"+endpoint, "application/json", bytes.NewBuffer(jsonBytes)) + assert.Nil(t, err) + + event, err := s.options.manager.ConsumeEvent(funcConf.Output) + if err != nil { + t.Error(err) + return + } + var out tests.Person + err = json.Unmarshal(event.GetPayload(), &out) + if err != nil { + t.Error(err) + return + } + if out.Money != 1 { + t.Errorf("expected 1, got %d", out.Money) + return + } +} From 38b4df0f0db5e0078c57f602dcc3763f4edff8f6 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 23:37:37 +0800 Subject: [PATCH 3/5] style: add license and fix lint Signed-off-by: Zike Yang --- fs/contube/http.go | 23 +++++++++++++++++++---- fs/contube/http_test.go | 17 +++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/fs/contube/http.go b/fs/contube/http.go index 6fe4b46a..2d86f679 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package contube import ( @@ -28,10 +44,9 @@ var ( type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error type endpointHandler struct { - ctx context.Context - s atomic.Value - handler EndpointHandler - c chan Record + ctx context.Context + s atomic.Value + c chan Record } type HttpTubeFactory struct { diff --git a/fs/contube/http_test.go b/fs/contube/http_test.go index 81a5cb5f..b55636e7 100644 --- a/fs/contube/http_test.go +++ b/fs/contube/http_test.go @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package contube import ( @@ -31,5 +47,6 @@ func TestHttpTubeHandleRecord(t *testing.T) { assert.Nil(t, <-source) err = f.Handle(ctx, endpoint, []byte("test")) + assert.Nil(t, err) assert.Error(t, ErrEndpointNotFound) } From e54852449e6dcebd8fc85165d53c7437545a46c3 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 23:41:01 +0800 Subject: [PATCH 4/5] tests: add test `TestHttpTubeSinkTubeNotImplement` Signed-off-by: Zike Yang --- fs/contube/contube.go | 5 +++++ fs/contube/http.go | 4 ++-- fs/contube/http_test.go | 6 ++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 9dc9fe5a..50377055 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -18,6 +18,11 @@ package contube import ( "context" + "github.com/pkg/errors" +) + +var ( + ErrSinkTubeNotImplemented = errors.New("sink tube not implemented") ) type Record interface { diff --git a/fs/contube/http.go b/fs/contube/http.go index 2d86f679..a7ad2efe 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -129,8 +129,8 @@ func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) ( return result, nil } -func (f *HttpTubeFactory) NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error) { - return nil, errors.New("http tube factory does not support sink tube") +func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Record, error) { + return nil, ErrSinkTubeNotImplemented } func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request) { diff --git a/fs/contube/http_test.go b/fs/contube/http_test.go index b55636e7..cdd44bd9 100644 --- a/fs/contube/http_test.go +++ b/fs/contube/http_test.go @@ -50,3 +50,9 @@ func TestHttpTubeHandleRecord(t *testing.T) { assert.Nil(t, err) assert.Error(t, ErrEndpointNotFound) } + +func TestHttpTubeSinkTubeNotImplement(t *testing.T) { + f := NewHttpTubeFactory(context.Background()).(*HttpTubeFactory) + _, err := f.NewSinkTube(context.Background(), make(ConfigMap)) + assert.ErrorIs(t, err, ErrSinkTubeNotImplemented) +} From 814aaeef10675b1f04a2e741621d962968293d03 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 23:42:16 +0800 Subject: [PATCH 5/5] tests: fix test `TestHttpTubeHandleRecord` Signed-off-by: Zike Yang --- fs/contube/http_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fs/contube/http_test.go b/fs/contube/http_test.go index cdd44bd9..7c28e024 100644 --- a/fs/contube/http_test.go +++ b/fs/contube/http_test.go @@ -47,8 +47,7 @@ func TestHttpTubeHandleRecord(t *testing.T) { assert.Nil(t, <-source) err = f.Handle(ctx, endpoint, []byte("test")) - assert.Nil(t, err) - assert.Error(t, ErrEndpointNotFound) + assert.Error(t, err, ErrEndpointNotFound) } func TestHttpTubeSinkTubeNotImplement(t *testing.T) {