diff --git a/.github/workflows/bench.yaml b/.github/workflows/bench.yaml index a00accf1..5d8cd96a 100644 --- a/.github/workflows/bench.yaml +++ b/.github/workflows/bench.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: Benchmark on: diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d2994c9..a4152c04 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: CI on: diff --git a/.github/workflows/license.yaml b/.github/workflows/license.yaml index 3f3539c2..865f8e25 100644 --- a/.github/workflows/license.yaml +++ b/.github/workflows/license.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: License Check on: diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..f91de055 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,54 @@ +# Copyright 2024 Function Stream Org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +run: + deadline: 5m + allow-parallel-runners: true + +issues: + # don't skip warning about doc comments + # don't exclude the default set of lint + exclude-use-default: false + # restore some of the defaults + # (fill in the rest as needed) + exclude-rules: + - path: "api/*" + linters: + - lll + - path: "internal/*" + linters: + - dupl + - lll +linters: + disable-all: true + enable: + - dupl + - errcheck + - exportloopref + - goconst + - gocyclo + - gofmt + - goimports + - gosimple + - govet + - ineffassign + - lll + - misspell + - nakedret + - prealloc + - staticcheck + - typecheck + - unconvert + - unparam + - unused diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index cdf89b97..d4dc104c 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -18,8 +18,6 @@ package benchmark import ( "context" - "github.com/functionstream/function-stream/fs/api" - "github.com/functionstream/function-stream/fs/runtime/wazero" "math/rand" "os" "runtime/pprof" @@ -27,6 +25,9 @@ import ( "testing" "time" + "github.com/functionstream/function-stream/fs/api" + "github.com/functionstream/function-stream/fs/runtime/wazero" + "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" adminclient "github.com/functionstream/function-stream/admin/client" @@ -115,12 +116,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background()) s, err := server.NewServer( - server.WithRuntimeFactoryBuilder(common.WASMRuntime, func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { - return wazero.NewWazeroFunctionRuntimeFactory(), nil - }), - server.WithTubeFactoryBuilder(common.MemoryTubeType, func(configMap common.ConfigMap) (contube.TubeFactory, error) { - return memoryQueueFactory, nil - }), + server.WithRuntimeFactoryBuilder(common.WASMRuntime, + func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return wazero.NewWazeroFunctionRuntimeFactory(), nil + }), + server.WithTubeFactoryBuilder(common.MemoryTubeType, + func(configMap common.ConfigMap) (contube.TubeFactory, error) { + return memoryQueueFactory, nil + }), ) if err != nil { b.Fatal(err) @@ -134,7 +137,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { inputTopic := "test-input-" + strconv.Itoa(rand.Int()) outputTopic := "test-output-" + strconv.Itoa(rand.Int()) - replicas := int32(1) + replicas := int32(5) pConfig := &perf.Config{ RequestRate: 200000.0, diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go new file mode 100644 index 00000000..2a34fed6 --- /dev/null +++ b/clients/gofs/gofs.go @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gofs + +import "C" +import ( + "encoding/json" + "fmt" + "io" + "os" + + . "github.com/functionstream/function-stream/common/wasm_utils" + "github.com/wirelessr/avroschema" +) + +var processFile *os.File + +func init() { + processFile, _ = os.Open("/process") +} + +var processFunc func([]byte) []byte + +//go:wasmimport fs registerSchema +func registerSchema(inputSchemaPtrSize, outputSchemaPtrSize uint64) + +func Register[I any, O any](process func(*I) *O) error { + inputSchema, err := avroschema.Reflect(new(I)) + if err != nil { + return err + } + outputSchema, err := avroschema.Reflect(new(O)) + if err != nil { + return err + } + processFunc = func(payload []byte) []byte { + input := new(I) + err = json.Unmarshal(payload, input) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) + } + output := process(input) + outputPayload, _ := json.Marshal(output) + return outputPayload + } + registerSchema(PtrSize(StringToPtr(inputSchema)), PtrSize(StringToPtr(outputSchema))) + return nil +} + +//export process +func process() { + payload, _ := io.ReadAll(processFile) + outputPayload := processFunc(payload) + _, _ = processFile.Write(outputPayload) +} diff --git a/cmd/client/reload/cmd.go b/cmd/client/reload/cmd.go index 3f7ab833..1762ab67 100644 --- a/cmd/client/reload/cmd.go +++ b/cmd/client/reload/cmd.go @@ -19,10 +19,11 @@ package reload import ( "context" "fmt" + "os" + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" - "os" ) var Cmd = &cobra.Command{ diff --git a/common/wasm_utils/wasm_utils.go b/common/wasm_utils/wasm_utils.go new file mode 100644 index 00000000..60f7d004 --- /dev/null +++ b/common/wasm_utils/wasm_utils.go @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package wasm_utils + +import "unsafe" + +// StringToPtr returns a pointer and size pair for the given string in a way +// compatible with WebAssembly numeric types. +// The returned pointer aliases the string hence the string must be kept alive +// until ptr is no longer needed. +func StringToPtr(s string) (uint32, uint32) { + ptr := unsafe.Pointer(unsafe.StringData(s)) + return uint32(uintptr(ptr)), uint32(len(s)) +} + +//// StringToLeakedPtr returns a pointer and size pair for the given string in a way +//// compatible with WebAssembly numeric types. +//// The pointer is not automatically managed by TinyGo hence it must be freed by the host. +//func StringToLeakedPtr(s string) (uint32, uint32) { +// size := C.ulong(len(s)) +// ptr := unsafe.Pointer(C.malloc(size)) +// copy(unsafe.Slice((*byte)(ptr), size), s) +// return uint32(uintptr(ptr)), uint32(size) +//} + +func PtrSize(ptr, size uint32) uint64 { + return (uint64(ptr) << 32) | uint64(size) +} + +func ExtractPtrSize(ptrSize uint64) (uint32, uint32) { + return uint32(ptrSize >> 32), uint32(ptrSize) +} diff --git a/examples/basic/main.go b/examples/basic/main.go index d6ae3964..b2c2b7be 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -17,48 +17,26 @@ package main import ( - "encoding/json" "fmt" - "io" + "github.com/functionstream/function-stream/clients/gofs" "os" ) +func main() { + _, _ = fmt.Fprintln(os.Stderr, "Hello from Go!") +} + type Person struct { Name string `json:"name"` Money int `json:"money"` Expected int `json:"expected"` } -func main() { - _, _ = fmt.Fprintln(os.Stderr, "Hello from Go!") +func init() { + _ = gofs.Register(myProcess) } -//export process -func process() { - dataBytes, err := io.ReadAll(os.Stdin) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to read data:", err) - os.Exit(1) - } - - var person Person - err = json.Unmarshal(dataBytes, &person) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to parse JSON:", err) - os.Exit(1) - } - - person.Money++ - - jsonBytes, err := json.Marshal(person) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to encode JSON:", err) - os.Exit(1) - } - - _, err = fmt.Printf("%s", jsonBytes) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to print JSON:", err) - os.Exit(1) - } +func myProcess(person *Person) *Person { + person.Money += 1 + return person } diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 8c1e3a4d..08e34982 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -19,6 +19,7 @@ package contube import ( "context" "encoding/json" + "github.com/go-playground/validator/v10" "github.com/pkg/errors" ) @@ -31,6 +32,7 @@ var ( type Record interface { GetPayload() []byte + GetSchema() string Commit() } @@ -110,6 +112,7 @@ type TubeFactory interface { type RecordImpl struct { payload []byte + schema string commitFunc func() } @@ -120,10 +123,22 @@ func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl { } } +func NewSchemaRecordImpl(payload []byte, schema string, ackFunc func()) *RecordImpl { + return &RecordImpl{ + payload: payload, + schema: schema, + commitFunc: ackFunc, + } +} + func (e *RecordImpl) GetPayload() []byte { return e.payload } +func (e *RecordImpl) GetSchema() string { + return e.schema +} + func (e *RecordImpl) Commit() { if e.commitFunc != nil { e.commitFunc() diff --git a/fs/contube/http.go b/fs/contube/http.go index c2ced876..0816f950 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -17,12 +17,13 @@ package contube import ( - "github.com/functionstream/function-stream/common" "io" "net/http" "sync" "sync/atomic" + "github.com/functionstream/function-stream/common" + "github.com/pkg/errors" "golang.org/x/net/context" ) diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index 68545d85..8470d161 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -155,8 +155,14 @@ func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeF flush() return } + schemaDef := e.GetSchema() + var schema pulsar.Schema + if schemaDef != "" { + schema = pulsar.NewJSONSchema(schemaDef, nil) + } producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: e.GetPayload(), + Schema: schema, }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { if err != nil { handleErr(ctx, err, "Error sending message", "error", err, "messageId", id) diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index dd70c980..81362f43 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -17,9 +17,10 @@ package fs import ( - "github.com/functionstream/function-stream/common" "testing" + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/model" ) diff --git a/fs/manager.go b/fs/manager.go index fab9ac7b..7de06004 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -16,11 +16,9 @@ package fs -import "fmt" - import ( "context" - "github.com/go-logr/logr" + "fmt" "math/rand" "strconv" "sync" @@ -30,6 +28,7 @@ import ( "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/fs/statestore" + "github.com/go-logr/logr" "github.com/pkg/errors" ) diff --git a/fs/runtime/wazero/fs.go b/fs/runtime/wazero/fs.go new file mode 100644 index 00000000..4b2cd604 --- /dev/null +++ b/fs/runtime/wazero/fs.go @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package wazero + +import ( + "bytes" + "io/fs" + + . "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/sys" +) + +type memoryFS struct { + FS + m map[string]File +} + +func (f *memoryFS) OpenFile(path string, _ Oflag, _ fs.FileMode) (File, Errno) { + if path == "." { + return &memoryFile{isDir: true}, 0 + } + if file, ok := f.m[path]; ok { + return file, 0 + } + return nil, ENOENT +} + +type memoryFile struct { + File + isDir bool + readBuf bytes.Buffer + writeBuf bytes.Buffer +} + +func newMemoryFS(m map[string]File) FS { + return &memoryFS{ + m: m, + } +} + +func (f *memoryFile) Read(p []byte) (n int, errno Errno) { + n, _ = f.readBuf.Read(p) + errno = 0 + return +} + +func (f *memoryFile) Write(buf []byte) (n int, errno Errno) { + n, _ = f.writeBuf.Write(buf) + errno = 0 + return +} + +func (f *memoryFile) IsDir() (bool, Errno) { + return f.isDir, 0 +} + +func (f *memoryFile) Close() Errno { + return 0 +} + +func (f *memoryFile) Stat() (sys.Stat_t, Errno) { + return sys.Stat_t{}, 0 +} diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 39e0c08e..3d120614 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -17,16 +17,20 @@ package wazero import ( + "context" "fmt" + "os" + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/wasm_utils" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/tetratelabs/wazero" wazero_api "github.com/tetratelabs/wazero/api" + exp_sys "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/experimental/sysfs" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/tetratelabs/wazero/sys" - "golang.org/x/net/context" - "os" ) type WazeroFunctionRuntimeFactory struct { @@ -44,13 +48,18 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort") }).Export("abort").Instantiate(instance.Context()) if err != nil { - return nil, fmt.Errorf("error instantiating function module: %w", err) + return nil, fmt.Errorf("error instantiating env module: %w", err) } stdin := common.NewChanReader() stdout := common.NewChanWriter() + processFile := &memoryFile{} + fileMap := map[string]exp_sys.File{ + "process": processFile, + } + fsConfig := wazero.NewFSConfig().(sysfs.FSConfig).WithSysFSMount(newMemoryFS(fileMap), "") config := wazero.NewModuleConfig(). - WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr) + WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr).WithFSConfig(fsConfig) wasi_snapshot_preview1.MustInstantiate(instance.Context(), r) @@ -69,6 +78,32 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI if err != nil { return nil, fmt.Errorf("error reading wasm file: %w", err) } + var outputSchemaDef string + _, err = r.NewHostModuleBuilder("fs"). + NewFunctionBuilder(). + WithFunc(func(ctx context.Context, m wazero_api.Module, inputSchema uint64, outputSchema uint64) { + inputBuf, ok := m.Memory().Read(wasm_utils.ExtractPtrSize(inputSchema)) + if !ok { + log.Error(fmt.Errorf("failed to read memory"), "failed to read memory") + return + } + if log.DebugEnabled() { + log.Info("Register the input schema", "schema", string(inputBuf)) + } + outputBuf, ok := m.Memory().Read(wasm_utils.ExtractPtrSize(outputSchema)) + if !ok { + log.Error(fmt.Errorf("failed to read memory"), "failed to read memory") + return + } + if log.DebugEnabled() { + log.Info("Register the output schema", "schema", string(outputBuf)) + } + outputSchemaDef = string(outputBuf) + }).Export("registerSchema"). + Instantiate(instance.Context()) + if err != nil { + return nil, fmt.Errorf("error creating fs module: %w", err) + } // Trigger the "_start" function, WASI's "main". mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) if err != nil { @@ -78,19 +113,26 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI return nil, fmt.Errorf("failed to instantiate function: %w", err) } } + if err != nil { + return nil, fmt.Errorf("error instantiating runtime: %w", err) + } process := mod.ExportedFunction("process") if process == nil { return nil, fmt.Errorf("no process function found") } return &FunctionRuntime{ callFunc: func(e contube.Record) (contube.Record, error) { - stdin.ResetBuffer(e.GetPayload()) + payload := e.GetPayload() + processFile.writeBuf.Reset() + _, _ = processFile.readBuf.Write(payload) _, err := process.Call(instance.Context()) if err != nil { - return nil, fmt.Errorf("error calling wasm function: %w", err) + return nil, err } - output := stdout.GetAndReset() - return contube.NewRecordImpl(output, e.Commit), nil + outBuf := processFile.writeBuf.Bytes() + outputPayload := make([]byte, len(outBuf)) + copy(outputPayload, outBuf) + return contube.NewSchemaRecordImpl(outputPayload, outputSchemaDef, e.Commit), nil }, stopFunc: func() { err := r.Close(instance.Context()) diff --git a/go.mod b/go.mod index a782ea84..e2b0c2a2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/functionstream/function-stream -go 1.21 +go 1.21.4 require ( github.com/apache/pulsar-client-go v0.12.0 @@ -17,6 +17,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/tetratelabs/wazero v1.6.0 + github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 go.uber.org/zap v1.26.0 golang.org/x/net v0.23.0 golang.org/x/time v0.5.0 diff --git a/go.sum b/go.sum index 00cb5d59..3e3f9247 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 h1:8W0F/PiIV6W/yl2JNfPIVey3mYrdAz/h77RUMIcBIUs= +github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7/go.mod h1:ivMyAKRe5TqRXC665a1Lv9cWLbkua2K1dnJYslCYi00= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/perf/perf.go b/perf/perf.go index 88fdb77a..8c8c27ef 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -256,6 +256,7 @@ func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failur slog.Error( "Failed to unmarshal Person", slog.Any("error", err), + slog.Any("payload", payload), ) os.Exit(1) } diff --git a/server/config_test.go b/server/config_test.go index 488981fc..821ece59 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -17,11 +17,12 @@ package server import ( + "os" + "testing" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" - "testing" ) func TestLoadConfigFromYaml(t *testing.T) { diff --git a/server/function_store.go b/server/function_store.go index ec9892ae..ebba8ab1 100644 --- a/server/function_store.go +++ b/server/function_store.go @@ -17,12 +17,6 @@ package server import ( - restfulspec "github.com/emicklei/go-restful-openapi/v2" - "github.com/emicklei/go-restful/v3" - "github.com/functionstream/function-stream/common/model" - "github.com/functionstream/function-stream/fs" - "github.com/pkg/errors" - "gopkg.in/yaml.v3" "io" "log/slog" "net/http" @@ -30,6 +24,13 @@ import ( "path/filepath" "strings" "sync" + + restfulspec "github.com/emicklei/go-restful-openapi/v2" + "github.com/emicklei/go-restful/v3" + "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/fs" + "github.com/pkg/errors" + "gopkg.in/yaml.v3" ) type FunctionStore interface { diff --git a/server/function_store_test.go b/server/function_store_test.go index 4f67c2a1..9a8c68f5 100644 --- a/server/function_store_test.go +++ b/server/function_store_test.go @@ -17,6 +17,9 @@ package server import ( + "os" + "testing" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs" @@ -24,8 +27,6 @@ import ( "github.com/functionstream/function-stream/fs/contube" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" - "os" - "testing" ) type testFunctionManagerImpl struct { diff --git a/server/server.go b/server/server.go index 3b758686..d12161e6 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,6 @@ package server import ( "context" "fmt" - "github.com/go-logr/logr" "net" "net/http" "net/url" @@ -27,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/go-logr/logr" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/common" @@ -108,14 +109,19 @@ func WithQueueConfig(config QueueConfig) ServerOption { }) } -func WithTubeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { +func WithTubeFactoryBuilder( + name string, + builder func(configMap common.ConfigMap) (contube.TubeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.tubeFactoryBuilders[name] = builder return o, nil }) } -func WithTubeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { +func WithTubeFactoryBuilders( + builder map[string]func(configMap common.ConfigMap, + ) (contube.TubeFactory, error)) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { o.tubeFactoryBuilders[n] = b @@ -124,14 +130,19 @@ func WithTubeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) }) } -func WithRuntimeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { +func WithRuntimeFactoryBuilder( + name string, + builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.runtimeFactoryBuilders[name] = builder return o, nil }) } -func WithRuntimeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { +func WithRuntimeFactoryBuilders( + builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { o.runtimeFactoryBuilders[n] = b @@ -159,7 +170,8 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) common.PulsarTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(configMap)) }, - common.MemoryTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { + //nolint:unparam + common.MemoryTubeType: func(_ common.ConfigMap) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(context.Background()), nil }, } @@ -167,6 +179,7 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) func GetBuiltinRuntimeFactoryBuilder() map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { return map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error){ + //nolint:unparam common.WASMRuntime: func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { return wazero.NewWazeroFunctionRuntimeFactory(), nil }, @@ -333,7 +346,10 @@ func NewDefaultServer() (*Server, error) { }, RuntimeConfig: map[string]common.ConfigMap{}, } - return NewServer(WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), WithConfig(defaultConfig)) + return NewServer( + WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), + WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), + WithConfig(defaultConfig)) } func (s *Server) Run(context context.Context) { diff --git a/server/server_test.go b/server/server_test.go index c380f43b..4e34085f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -237,9 +237,10 @@ func (r *MockRuntime) Stop() { func TestStatefulFunction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s, httpAddr := startStandaloneSvr(t, ctx, WithRuntimeFactoryBuilder("mock", func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { - return &MockRuntimeFactory{}, nil - })) + s, httpAddr := startStandaloneSvr(t, ctx, + WithRuntimeFactoryBuilder("mock", func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return &MockRuntimeFactory{}, nil + })) input := "input" output := "output"