From b16b51d67350910783e035e92a6209d82752ec39 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 4 Jun 2024 19:09:18 +0800 Subject: [PATCH 1/9] feat: add wasm SDK --- benchmark/bench_test.go | 2 +- clients/gofs/gofs.go | 58 ++++++++++++++++++++++++ common/wasm_utils/wasm_utils.go | 56 +++++++++++++++++++++++ examples/basic/main.go | 43 +++++------------- fs/contube/contube.go | 14 ++++++ fs/contube/pulsar.go | 6 +++ fs/runtime/wazero/wazero_runtime.go | 70 ++++++++++++++++++++++++++--- go.mod | 5 ++- go.sum | 2 + 9 files changed, 215 insertions(+), 41 deletions(-) create mode 100644 clients/gofs/gofs.go create mode 100644 common/wasm_utils/wasm_utils.go diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index cdf89b97..dfffb87a 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -134,7 +134,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { inputTopic := "test-input-" + strconv.Itoa(rand.Int()) outputTopic := "test-output-" + strconv.Itoa(rand.Int()) - replicas := int32(1) + replicas := int32(5) pConfig := &perf.Config{ RequestRate: 200000.0, diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go new file mode 100644 index 00000000..23d415ed --- /dev/null +++ b/clients/gofs/gofs.go @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gofs + +import "C" +import ( + "encoding/json" + . "github.com/functionstream/function-stream/common/wasm_utils" + "github.com/wirelessr/avroschema" +) + +var processFunc func(uint64) uint64 + +//go:wasmimport fs registerSchema +func registerSchema(inputSchemaPtrSize, outputSchemaPtrSize uint64) + +func Register[I any, O any](process func(*I) *O) error { + inputSchema, err := avroschema.Reflect(new(I)) + if err != nil { + return err + } + outputSchema, err := avroschema.Reflect(new(O)) + if err != nil { + return err + } + processFunc = func(ptrSize uint64) uint64 { + payload := PtrToString(ExtractPtrSize(ptrSize)) + input := new(I) + err = json.Unmarshal([]byte(payload), input) + if err != nil { + //fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) + } + output := process(input) + outputPayload, _ := json.Marshal(output) + return PtrSize(StringToPtr(string(outputPayload))) + } + registerSchema(PtrSize(StringToPtr(inputSchema)), PtrSize(StringToPtr(outputSchema))) + return nil +} + +//export processRecord +func processRecord(ptrSize uint64) uint64 { + return processFunc(ptrSize) +} diff --git a/common/wasm_utils/wasm_utils.go b/common/wasm_utils/wasm_utils.go new file mode 100644 index 00000000..40e50f7c --- /dev/null +++ b/common/wasm_utils/wasm_utils.go @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package wasm_utils + +import "unsafe" + +// PtrToString returns a string from WebAssembly compatible numeric types +// representing its pointer and length. +func PtrToString(ptr uint32, size uint32) string { + return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) +} + +func PtrToBytes(ptr uint32, size uint32) string { + return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) +} + +// StringToPtr returns a pointer and size pair for the given string in a way +// compatible with WebAssembly numeric types. +// The returned pointer aliases the string hence the string must be kept alive +// until ptr is no longer needed. +func StringToPtr(s string) (uint32, uint32) { + ptr := unsafe.Pointer(unsafe.StringData(s)) + return uint32(uintptr(ptr)), uint32(len(s)) +} + +//// StringToLeakedPtr returns a pointer and size pair for the given string in a way +//// compatible with WebAssembly numeric types. +//// The pointer is not automatically managed by TinyGo hence it must be freed by the host. +//func StringToLeakedPtr(s string) (uint32, uint32) { +// size := C.ulong(len(s)) +// ptr := unsafe.Pointer(C.malloc(size)) +// copy(unsafe.Slice((*byte)(ptr), size), s) +// return uint32(uintptr(ptr)), uint32(size) +//} + +func PtrSize(ptr, size uint32) uint64 { + return (uint64(ptr) << 32) | uint64(size) +} + +func ExtractPtrSize(ptrSize uint64) (uint32, uint32) { + return uint32(ptrSize >> 32), uint32(ptrSize) +} diff --git a/examples/basic/main.go b/examples/basic/main.go index d6ae3964..7cc0f647 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -17,48 +17,27 @@ package main import ( - "encoding/json" "fmt" - "io" + "github.com/functionstream/function-stream/clients/gofs" "os" ) +func main() { + _, _ = fmt.Fprintln(os.Stderr, "Hello from Go!") +} + type Person struct { Name string `json:"name"` Money int `json:"money"` Expected int `json:"expected"` } -func main() { - _, _ = fmt.Fprintln(os.Stderr, "Hello from Go!") +//export init +func init() { + _ = gofs.Register(myProcess) } -//export process -func process() { - dataBytes, err := io.ReadAll(os.Stdin) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to read data:", err) - os.Exit(1) - } - - var person Person - err = json.Unmarshal(dataBytes, &person) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to parse JSON:", err) - os.Exit(1) - } - - person.Money++ - - jsonBytes, err := json.Marshal(person) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to encode JSON:", err) - os.Exit(1) - } - - _, err = fmt.Printf("%s", jsonBytes) - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, "Failed to print JSON:", err) - os.Exit(1) - } +func myProcess(person *Person) *Person { + person.Money += 1 + return person } diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 8c1e3a4d..92553486 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -31,6 +31,7 @@ var ( type Record interface { GetPayload() []byte + GetSchema() string Commit() } @@ -110,6 +111,7 @@ type TubeFactory interface { type RecordImpl struct { payload []byte + schema string commitFunc func() } @@ -120,10 +122,22 @@ func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl { } } +func NewSchemaRecordImpl(payload []byte, schema string, ackFunc func()) *RecordImpl { + return &RecordImpl{ + payload: payload, + schema: schema, + commitFunc: ackFunc, + } +} + func (e *RecordImpl) GetPayload() []byte { return e.payload } +func (e *RecordImpl) GetSchema() string { + return e.schema +} + func (e *RecordImpl) Commit() { if e.commitFunc != nil { e.commitFunc() diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index 68545d85..8470d161 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -155,8 +155,14 @@ func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeF flush() return } + schemaDef := e.GetSchema() + var schema pulsar.Schema + if schemaDef != "" { + schema = pulsar.NewJSONSchema(schemaDef, nil) + } producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: e.GetPayload(), + Schema: schema, }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { if err != nil { handleErr(ctx, err, "Error sending message", "error", err, "messageId", id) diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 39e0c08e..60b19426 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -17,15 +17,16 @@ package wazero import ( + "context" "fmt" "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/wasm_utils" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/tetratelabs/wazero" wazero_api "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/tetratelabs/wazero/sys" - "golang.org/x/net/context" "os" ) @@ -44,7 +45,7 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort") }).Export("abort").Instantiate(instance.Context()) if err != nil { - return nil, fmt.Errorf("error instantiating function module: %w", err) + return nil, fmt.Errorf("error instantiating env module: %w", err) } stdin := common.NewChanReader() stdout := common.NewChanWriter() @@ -69,6 +70,29 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI if err != nil { return nil, fmt.Errorf("error reading wasm file: %w", err) } + var outputSchemaDef string + _, err = r.NewHostModuleBuilder("fs"). + NewFunctionBuilder(). + WithFunc(func(ctx context.Context, m wazero_api.Module, inputSchema uint64, outputSchema uint64) { + inputBuf, ok := m.Memory().Read(wasm_utils.ExtractPtrSize(inputSchema)) + if !ok { + log.Error(fmt.Errorf("failed to read memory"), "failed to read memory") + return + } + if log.DebugEnabled() { + log.Info("Register the input schema", "schema", string(inputBuf)) + } + outputBuf, ok := m.Memory().Read(wasm_utils.ExtractPtrSize(outputSchema)) + if !ok { + log.Error(fmt.Errorf("failed to read memory"), "failed to read memory") + return + } + if log.DebugEnabled() { + log.Info("Register the output schema", "schema", string(outputBuf)) + } + outputSchemaDef = string(outputBuf) + }).Export("registerSchema"). + Instantiate(instance.Context()) // Trigger the "_start" function, WASI's "main". mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) if err != nil { @@ -79,18 +103,50 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI } } process := mod.ExportedFunction("process") + malloc := mod.ExportedFunction("malloc") + free := mod.ExportedFunction("free") + init := mod.ExportedFunction("init") + processRecord := mod.ExportedFunction("processRecord") + + if err != nil { + return nil, fmt.Errorf("error instantiating fs module: %w", err) + } + _, err = init.Call(instance.Context()) + if err != nil { + return nil, err + } if process == nil { return nil, fmt.Errorf("no process function found") } return &FunctionRuntime{ callFunc: func(e contube.Record) (contube.Record, error) { - stdin.ResetBuffer(e.GetPayload()) - _, err := process.Call(instance.Context()) + payload := e.GetPayload() + payloadSize := len(payload) + mallocResults, err := malloc.Call(instance.Context(), uint64(payloadSize)) + if err != nil { + panic(err) + } + ptr := mallocResults[0] + defer func() { + _, err := free.Call(instance.Context(), ptr) + if err != nil { + panic(err) + } + }() + if !mod.Memory().Write(uint32(ptr), payload) { + log.Error(fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d", + ptr, payloadSize, mod.Memory().Size()), "failed to write memory") + } + outputPtrSize, err := processRecord.Call(instance.Context(), wasm_utils.PtrSize(uint32(ptr), uint32(payloadSize))) if err != nil { - return nil, fmt.Errorf("error calling wasm function: %w", err) + panic(err) + } + //fmt.Println(wasm_utils.ExtractPtrSize(outputPtrSize[0])) + if bytes, ok := mod.Memory().Read(wasm_utils.ExtractPtrSize(outputPtrSize[0])); !ok { + return nil, fmt.Errorf("failed to read memory") + } else { + return contube.NewSchemaRecordImpl(bytes, outputSchemaDef, e.Commit), nil } - output := stdout.GetAndReset() - return contube.NewRecordImpl(output, e.Commit), nil }, stopFunc: func() { err := r.Close(instance.Context()) diff --git a/go.mod b/go.mod index a782ea84..e99faec1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/functionstream/function-stream -go 1.21 +go 1.21.4 + +toolchain go1.22.0 require ( github.com/apache/pulsar-client-go v0.12.0 @@ -90,6 +92,7 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect diff --git a/go.sum b/go.sum index 00cb5d59..3e3f9247 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 h1:8W0F/PiIV6W/yl2JNfPIVey3mYrdAz/h77RUMIcBIUs= +github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7/go.mod h1:ivMyAKRe5TqRXC665a1Lv9cWLbkua2K1dnJYslCYi00= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= From bd411a9d778633462820010f93d80e1579da3dad Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 16:38:56 +0800 Subject: [PATCH 2/9] fix: fix wasm memory error --- examples/basic/main.go | 1 - fs/runtime/wazero/wazero_runtime.go | 8 +------- perf/perf.go | 1 + 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/examples/basic/main.go b/examples/basic/main.go index 7cc0f647..b2c2b7be 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -32,7 +32,6 @@ type Person struct { Expected int `json:"expected"` } -//export init func init() { _ = gofs.Register(myProcess) } diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 60b19426..7f555ca5 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -102,20 +102,14 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI return nil, fmt.Errorf("failed to instantiate function: %w", err) } } - process := mod.ExportedFunction("process") malloc := mod.ExportedFunction("malloc") free := mod.ExportedFunction("free") - init := mod.ExportedFunction("init") processRecord := mod.ExportedFunction("processRecord") if err != nil { return nil, fmt.Errorf("error instantiating fs module: %w", err) } - _, err = init.Call(instance.Context()) - if err != nil { - return nil, err - } - if process == nil { + if processRecord == nil { return nil, fmt.Errorf("no process function found") } return &FunctionRuntime{ diff --git a/perf/perf.go b/perf/perf.go index 88fdb77a..8c8c27ef 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -256,6 +256,7 @@ func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failur slog.Error( "Failed to unmarshal Person", slog.Any("error", err), + slog.Any("payload", payload), ) os.Exit(1) } From db8134ac873cdb407b994fdca9fd34dc35f30027 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 16:43:01 +0800 Subject: [PATCH 3/9] fix: fix go version --- clients/gofs/gofs.go | 4 +++- go.mod | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 23d415ed..8db285b9 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -19,8 +19,10 @@ package gofs import "C" import ( "encoding/json" + "fmt" . "github.com/functionstream/function-stream/common/wasm_utils" "github.com/wirelessr/avroschema" + "os" ) var processFunc func(uint64) uint64 @@ -42,7 +44,7 @@ func Register[I any, O any](process func(*I) *O) error { input := new(I) err = json.Unmarshal([]byte(payload), input) if err != nil { - //fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) + fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) } output := process(input) outputPayload, _ := json.Marshal(output) diff --git a/go.mod b/go.mod index e99faec1..85f98971 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/tetratelabs/wazero v1.6.0 + github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 go.uber.org/zap v1.26.0 golang.org/x/net v0.23.0 golang.org/x/time v0.5.0 @@ -92,7 +93,6 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/wirelessr/avroschema v0.0.0-20240111032105-ef4f4560e2a7 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect From 51a704fa8dc24f16ba50107f5738ed2ff1dbf7d1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 16:55:35 +0800 Subject: [PATCH 4/9] fix: fix go version --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 85f98971..e2b0c2a2 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/functionstream/function-stream go 1.21.4 -toolchain go1.22.0 - require ( github.com/apache/pulsar-client-go v0.12.0 github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e From cf75c94d55f530dde95a0574ed89f3e8c8d56fc0 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 16:57:02 +0800 Subject: [PATCH 5/9] fix lint --- fs/runtime/wazero/wazero_runtime.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 7f555ca5..e8db52fe 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -93,6 +93,9 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI outputSchemaDef = string(outputBuf) }).Export("registerSchema"). Instantiate(instance.Context()) + if err != nil { + return nil, fmt.Errorf("error creating fs module: %w", err) + } // Trigger the "_start" function, WASI's "main". mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) if err != nil { @@ -102,13 +105,12 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI return nil, fmt.Errorf("failed to instantiate function: %w", err) } } + if err != nil { + return nil, fmt.Errorf("error instantiating runtime: %w", err) + } malloc := mod.ExportedFunction("malloc") free := mod.ExportedFunction("free") processRecord := mod.ExportedFunction("processRecord") - - if err != nil { - return nil, fmt.Errorf("error instantiating fs module: %w", err) - } if processRecord == nil { return nil, fmt.Errorf("no process function found") } From bfea94f5230da148f9a6d0c5336567da4379e708 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 21:12:47 +0800 Subject: [PATCH 6/9] refactor(wasm): use virtual fs to transfer payload --- clients/gofs/gofs.go | 27 +++++++--- fs/runtime/wazero/fs.go | 79 +++++++++++++++++++++++++++++ fs/runtime/wazero/wazero_runtime.go | 47 +++++++---------- 3 files changed, 116 insertions(+), 37 deletions(-) create mode 100644 fs/runtime/wazero/fs.go diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 8db285b9..5362c409 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -22,10 +22,17 @@ import ( "fmt" . "github.com/functionstream/function-stream/common/wasm_utils" "github.com/wirelessr/avroschema" + "io" "os" ) -var processFunc func(uint64) uint64 +var processFile *os.File + +func init() { + processFile, _ = os.Open("/process") +} + +var processFunc func([]byte) []byte //go:wasmimport fs registerSchema func registerSchema(inputSchemaPtrSize, outputSchemaPtrSize uint64) @@ -39,22 +46,26 @@ func Register[I any, O any](process func(*I) *O) error { if err != nil { return err } - processFunc = func(ptrSize uint64) uint64 { - payload := PtrToString(ExtractPtrSize(ptrSize)) + processFunc = func(payload []byte) []byte { input := new(I) - err = json.Unmarshal([]byte(payload), input) + err = json.Unmarshal(payload, input) if err != nil { fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) } output := process(input) outputPayload, _ := json.Marshal(output) - return PtrSize(StringToPtr(string(outputPayload))) + return outputPayload } registerSchema(PtrSize(StringToPtr(inputSchema)), PtrSize(StringToPtr(outputSchema))) return nil } -//export processRecord -func processRecord(ptrSize uint64) uint64 { - return processFunc(ptrSize) +//export process +func process() { + payload, _ := io.ReadAll(processFile) + outputPayload := processFunc(payload) + n, _ := processFile.Write(outputPayload) + if n != len(outputPayload) { + _, _ = fmt.Fprintln(os.Stderr, "NO!") + } } diff --git a/fs/runtime/wazero/fs.go b/fs/runtime/wazero/fs.go new file mode 100644 index 00000000..ccf4806c --- /dev/null +++ b/fs/runtime/wazero/fs.go @@ -0,0 +1,79 @@ +package wazero + +import ( + "bytes" + . "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/sys" + "io/fs" +) + +type memoryFS struct { + FS + m map[string]File +} + +func (f *memoryFS) OpenFile(path string, flags Oflag, perm fs.FileMode) (File, Errno) { + if path == "." { + return &memoryFile{isDir: true}, 0 + } + if file, ok := f.m[path]; ok { + return file, 0 + } + return nil, ENOENT +} + +type memoryFile struct { + File + isDir bool + readBuf bytes.Buffer + writeBuf bytes.Buffer +} + +func newMemoryFS(m map[string]File) FS { + return &memoryFS{ + m: m, + } +} + +func newMemoryFile() File { + return &memoryFile{ + readBuf: bytes.Buffer{}, + writeBuf: bytes.Buffer{}, + } +} + +func (f *memoryFile) Read(p []byte) (n int, errno Errno) { + n, _ = f.readBuf.Read(p) + errno = 0 + return +} + +func (f *memoryFile) Write(buf []byte) (n int, errno Errno) { + n, _ = f.writeBuf.Write(buf) + errno = 0 + return +} + +func (f *memoryFile) Dev() (uint64, Errno) { + return 0, 0 +} + +func (f *memoryFile) Ino() (sys.Inode, Errno) { + return 0, 0 +} + +func (f *memoryFile) IsDir() (bool, Errno) { + return f.isDir, 0 +} + +func (f *memoryFile) IsAppend() bool { + return false +} + +func (f *memoryFile) Close() Errno { + return 0 +} + +func (f *memoryFile) Stat() (sys.Stat_t, Errno) { + return sys.Stat_t{}, 0 +} diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index e8db52fe..1a15b492 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -25,6 +25,8 @@ import ( "github.com/functionstream/function-stream/fs/contube" "github.com/tetratelabs/wazero" wazero_api "github.com/tetratelabs/wazero/api" + exp_sys "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/experimental/sysfs" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/tetratelabs/wazero/sys" "os" @@ -50,8 +52,13 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI stdin := common.NewChanReader() stdout := common.NewChanWriter() + processFile := &memoryFile{} + fileMap := map[string]exp_sys.File{ + "process": processFile, + } + fsConfig := wazero.NewFSConfig().(sysfs.FSConfig).WithSysFSMount(newMemoryFS(fileMap), "") config := wazero.NewModuleConfig(). - WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr) + WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr).WithFSConfig(fsConfig) wasi_snapshot_preview1.MustInstantiate(instance.Context(), r) @@ -108,41 +115,23 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI if err != nil { return nil, fmt.Errorf("error instantiating runtime: %w", err) } - malloc := mod.ExportedFunction("malloc") - free := mod.ExportedFunction("free") - processRecord := mod.ExportedFunction("processRecord") - if processRecord == nil { + process := mod.ExportedFunction("process") + if process == nil { return nil, fmt.Errorf("no process function found") } return &FunctionRuntime{ callFunc: func(e contube.Record) (contube.Record, error) { payload := e.GetPayload() - payloadSize := len(payload) - mallocResults, err := malloc.Call(instance.Context(), uint64(payloadSize)) + processFile.writeBuf.Reset() + _, _ = processFile.readBuf.Write(payload) + _, err := process.Call(instance.Context()) if err != nil { - panic(err) - } - ptr := mallocResults[0] - defer func() { - _, err := free.Call(instance.Context(), ptr) - if err != nil { - panic(err) - } - }() - if !mod.Memory().Write(uint32(ptr), payload) { - log.Error(fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d", - ptr, payloadSize, mod.Memory().Size()), "failed to write memory") - } - outputPtrSize, err := processRecord.Call(instance.Context(), wasm_utils.PtrSize(uint32(ptr), uint32(payloadSize))) - if err != nil { - panic(err) - } - //fmt.Println(wasm_utils.ExtractPtrSize(outputPtrSize[0])) - if bytes, ok := mod.Memory().Read(wasm_utils.ExtractPtrSize(outputPtrSize[0])); !ok { - return nil, fmt.Errorf("failed to read memory") - } else { - return contube.NewSchemaRecordImpl(bytes, outputSchemaDef, e.Commit), nil + return nil, err } + outBuf := processFile.writeBuf.Bytes() + outputPayload := make([]byte, len(outBuf)) + copy(outputPayload, outBuf) + return contube.NewSchemaRecordImpl(outputPayload, outputSchemaDef, e.Commit), nil }, stopFunc: func() { err := r.Close(instance.Context()) From 47fe68d0dfc14f1a295f8dd79bbd502614944b43 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 22:27:10 +0800 Subject: [PATCH 7/9] refactor: refine codes --- clients/gofs/gofs.go | 7 ++----- fs/runtime/wazero/fs.go | 37 +++++++++++++++++-------------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 5362c409..21d89522 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -61,11 +61,8 @@ func Register[I any, O any](process func(*I) *O) error { } //export process -func process() { +func internalProcess() { payload, _ := io.ReadAll(processFile) outputPayload := processFunc(payload) - n, _ := processFile.Write(outputPayload) - if n != len(outputPayload) { - _, _ = fmt.Fprintln(os.Stderr, "NO!") - } + _, _ = processFile.Write(outputPayload) } diff --git a/fs/runtime/wazero/fs.go b/fs/runtime/wazero/fs.go index ccf4806c..bce26364 100644 --- a/fs/runtime/wazero/fs.go +++ b/fs/runtime/wazero/fs.go @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package wazero import ( @@ -12,7 +28,7 @@ type memoryFS struct { m map[string]File } -func (f *memoryFS) OpenFile(path string, flags Oflag, perm fs.FileMode) (File, Errno) { +func (f *memoryFS) OpenFile(path string, _ Oflag, _ fs.FileMode) (File, Errno) { if path == "." { return &memoryFile{isDir: true}, 0 } @@ -35,13 +51,6 @@ func newMemoryFS(m map[string]File) FS { } } -func newMemoryFile() File { - return &memoryFile{ - readBuf: bytes.Buffer{}, - writeBuf: bytes.Buffer{}, - } -} - func (f *memoryFile) Read(p []byte) (n int, errno Errno) { n, _ = f.readBuf.Read(p) errno = 0 @@ -54,22 +63,10 @@ func (f *memoryFile) Write(buf []byte) (n int, errno Errno) { return } -func (f *memoryFile) Dev() (uint64, Errno) { - return 0, 0 -} - -func (f *memoryFile) Ino() (sys.Inode, Errno) { - return 0, 0 -} - func (f *memoryFile) IsDir() (bool, Errno) { return f.isDir, 0 } -func (f *memoryFile) IsAppend() bool { - return false -} - func (f *memoryFile) Close() Errno { return 0 } From da692bf4b008c09a99add10b6d8d995d56c774e7 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 23:01:03 +0800 Subject: [PATCH 8/9] chore: fix lint --- .golangci.yml | 54 +++++++++++++++++++++++++++++ benchmark/bench_test.go | 19 +++++----- clients/gofs/gofs.go | 7 ++-- cmd/client/reload/cmd.go | 3 +- common/wasm_utils/wasm_utils.go | 10 ------ fs/contube/contube.go | 1 + fs/contube/http.go | 3 +- fs/instance_impl_test.go | 3 +- fs/manager.go | 5 ++- fs/runtime/wazero/fs.go | 3 +- fs/runtime/wazero/wazero_runtime.go | 3 +- server/config_test.go | 5 +-- server/function_store.go | 13 +++---- server/function_store_test.go | 5 +-- server/server.go | 30 ++++++++++++---- server/server_test.go | 7 ++-- 16 files changed, 122 insertions(+), 49 deletions(-) create mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..f91de055 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,54 @@ +# Copyright 2024 Function Stream Org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +run: + deadline: 5m + allow-parallel-runners: true + +issues: + # don't skip warning about doc comments + # don't exclude the default set of lint + exclude-use-default: false + # restore some of the defaults + # (fill in the rest as needed) + exclude-rules: + - path: "api/*" + linters: + - lll + - path: "internal/*" + linters: + - dupl + - lll +linters: + disable-all: true + enable: + - dupl + - errcheck + - exportloopref + - goconst + - gocyclo + - gofmt + - goimports + - gosimple + - govet + - ineffassign + - lll + - misspell + - nakedret + - prealloc + - staticcheck + - typecheck + - unconvert + - unparam + - unused diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index dfffb87a..d4dc104c 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -18,8 +18,6 @@ package benchmark import ( "context" - "github.com/functionstream/function-stream/fs/api" - "github.com/functionstream/function-stream/fs/runtime/wazero" "math/rand" "os" "runtime/pprof" @@ -27,6 +25,9 @@ import ( "testing" "time" + "github.com/functionstream/function-stream/fs/api" + "github.com/functionstream/function-stream/fs/runtime/wazero" + "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" adminclient "github.com/functionstream/function-stream/admin/client" @@ -115,12 +116,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background()) s, err := server.NewServer( - server.WithRuntimeFactoryBuilder(common.WASMRuntime, func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { - return wazero.NewWazeroFunctionRuntimeFactory(), nil - }), - server.WithTubeFactoryBuilder(common.MemoryTubeType, func(configMap common.ConfigMap) (contube.TubeFactory, error) { - return memoryQueueFactory, nil - }), + server.WithRuntimeFactoryBuilder(common.WASMRuntime, + func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return wazero.NewWazeroFunctionRuntimeFactory(), nil + }), + server.WithTubeFactoryBuilder(common.MemoryTubeType, + func(configMap common.ConfigMap) (contube.TubeFactory, error) { + return memoryQueueFactory, nil + }), ) if err != nil { b.Fatal(err) diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 21d89522..2a34fed6 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -20,10 +20,11 @@ import "C" import ( "encoding/json" "fmt" - . "github.com/functionstream/function-stream/common/wasm_utils" - "github.com/wirelessr/avroschema" "io" "os" + + . "github.com/functionstream/function-stream/common/wasm_utils" + "github.com/wirelessr/avroschema" ) var processFile *os.File @@ -61,7 +62,7 @@ func Register[I any, O any](process func(*I) *O) error { } //export process -func internalProcess() { +func process() { payload, _ := io.ReadAll(processFile) outputPayload := processFunc(payload) _, _ = processFile.Write(outputPayload) diff --git a/cmd/client/reload/cmd.go b/cmd/client/reload/cmd.go index 3f7ab833..1762ab67 100644 --- a/cmd/client/reload/cmd.go +++ b/cmd/client/reload/cmd.go @@ -19,10 +19,11 @@ package reload import ( "context" "fmt" + "os" + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" - "os" ) var Cmd = &cobra.Command{ diff --git a/common/wasm_utils/wasm_utils.go b/common/wasm_utils/wasm_utils.go index 40e50f7c..60f7d004 100644 --- a/common/wasm_utils/wasm_utils.go +++ b/common/wasm_utils/wasm_utils.go @@ -18,16 +18,6 @@ package wasm_utils import "unsafe" -// PtrToString returns a string from WebAssembly compatible numeric types -// representing its pointer and length. -func PtrToString(ptr uint32, size uint32) string { - return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) -} - -func PtrToBytes(ptr uint32, size uint32) string { - return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) -} - // StringToPtr returns a pointer and size pair for the given string in a way // compatible with WebAssembly numeric types. // The returned pointer aliases the string hence the string must be kept alive diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 92553486..08e34982 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -19,6 +19,7 @@ package contube import ( "context" "encoding/json" + "github.com/go-playground/validator/v10" "github.com/pkg/errors" ) diff --git a/fs/contube/http.go b/fs/contube/http.go index c2ced876..0816f950 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -17,12 +17,13 @@ package contube import ( - "github.com/functionstream/function-stream/common" "io" "net/http" "sync" "sync/atomic" + "github.com/functionstream/function-stream/common" + "github.com/pkg/errors" "golang.org/x/net/context" ) diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index dd70c980..81362f43 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -17,9 +17,10 @@ package fs import ( - "github.com/functionstream/function-stream/common" "testing" + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/model" ) diff --git a/fs/manager.go b/fs/manager.go index fab9ac7b..7de06004 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -16,11 +16,9 @@ package fs -import "fmt" - import ( "context" - "github.com/go-logr/logr" + "fmt" "math/rand" "strconv" "sync" @@ -30,6 +28,7 @@ import ( "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/fs/statestore" + "github.com/go-logr/logr" "github.com/pkg/errors" ) diff --git a/fs/runtime/wazero/fs.go b/fs/runtime/wazero/fs.go index bce26364..4b2cd604 100644 --- a/fs/runtime/wazero/fs.go +++ b/fs/runtime/wazero/fs.go @@ -18,9 +18,10 @@ package wazero import ( "bytes" + "io/fs" + . "github.com/tetratelabs/wazero/experimental/sys" "github.com/tetratelabs/wazero/sys" - "io/fs" ) type memoryFS struct { diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 1a15b492..3d120614 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -19,6 +19,8 @@ package wazero import ( "context" "fmt" + "os" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/wasm_utils" "github.com/functionstream/function-stream/fs/api" @@ -29,7 +31,6 @@ import ( "github.com/tetratelabs/wazero/experimental/sysfs" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/tetratelabs/wazero/sys" - "os" ) type WazeroFunctionRuntimeFactory struct { diff --git a/server/config_test.go b/server/config_test.go index 488981fc..821ece59 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -17,11 +17,12 @@ package server import ( + "os" + "testing" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" - "testing" ) func TestLoadConfigFromYaml(t *testing.T) { diff --git a/server/function_store.go b/server/function_store.go index ec9892ae..ebba8ab1 100644 --- a/server/function_store.go +++ b/server/function_store.go @@ -17,12 +17,6 @@ package server import ( - restfulspec "github.com/emicklei/go-restful-openapi/v2" - "github.com/emicklei/go-restful/v3" - "github.com/functionstream/function-stream/common/model" - "github.com/functionstream/function-stream/fs" - "github.com/pkg/errors" - "gopkg.in/yaml.v3" "io" "log/slog" "net/http" @@ -30,6 +24,13 @@ import ( "path/filepath" "strings" "sync" + + restfulspec "github.com/emicklei/go-restful-openapi/v2" + "github.com/emicklei/go-restful/v3" + "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/fs" + "github.com/pkg/errors" + "gopkg.in/yaml.v3" ) type FunctionStore interface { diff --git a/server/function_store_test.go b/server/function_store_test.go index 4f67c2a1..9a8c68f5 100644 --- a/server/function_store_test.go +++ b/server/function_store_test.go @@ -17,6 +17,9 @@ package server import ( + "os" + "testing" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs" @@ -24,8 +27,6 @@ import ( "github.com/functionstream/function-stream/fs/contube" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" - "os" - "testing" ) type testFunctionManagerImpl struct { diff --git a/server/server.go b/server/server.go index 3b758686..d12161e6 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,6 @@ package server import ( "context" "fmt" - "github.com/go-logr/logr" "net" "net/http" "net/url" @@ -27,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/go-logr/logr" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/common" @@ -108,14 +109,19 @@ func WithQueueConfig(config QueueConfig) ServerOption { }) } -func WithTubeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { +func WithTubeFactoryBuilder( + name string, + builder func(configMap common.ConfigMap) (contube.TubeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.tubeFactoryBuilders[name] = builder return o, nil }) } -func WithTubeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { +func WithTubeFactoryBuilders( + builder map[string]func(configMap common.ConfigMap, + ) (contube.TubeFactory, error)) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { o.tubeFactoryBuilders[n] = b @@ -124,14 +130,19 @@ func WithTubeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) }) } -func WithRuntimeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { +func WithRuntimeFactoryBuilder( + name string, + builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.runtimeFactoryBuilders[name] = builder return o, nil }) } -func WithRuntimeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { +func WithRuntimeFactoryBuilders( + builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), +) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { o.runtimeFactoryBuilders[n] = b @@ -159,7 +170,8 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) common.PulsarTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(configMap)) }, - common.MemoryTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { + //nolint:unparam + common.MemoryTubeType: func(_ common.ConfigMap) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(context.Background()), nil }, } @@ -167,6 +179,7 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) func GetBuiltinRuntimeFactoryBuilder() map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { return map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error){ + //nolint:unparam common.WASMRuntime: func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { return wazero.NewWazeroFunctionRuntimeFactory(), nil }, @@ -333,7 +346,10 @@ func NewDefaultServer() (*Server, error) { }, RuntimeConfig: map[string]common.ConfigMap{}, } - return NewServer(WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), WithConfig(defaultConfig)) + return NewServer( + WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), + WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), + WithConfig(defaultConfig)) } func (s *Server) Run(context context.Context) { diff --git a/server/server_test.go b/server/server_test.go index c380f43b..4e34085f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -237,9 +237,10 @@ func (r *MockRuntime) Stop() { func TestStatefulFunction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s, httpAddr := startStandaloneSvr(t, ctx, WithRuntimeFactoryBuilder("mock", func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { - return &MockRuntimeFactory{}, nil - })) + s, httpAddr := startStandaloneSvr(t, ctx, + WithRuntimeFactoryBuilder("mock", func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return &MockRuntimeFactory{}, nil + })) input := "input" output := "output" From 69ca6c5e31d5513e5b0a705814119b6127a7be62 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 8 Jun 2024 23:27:35 +0800 Subject: [PATCH 9/9] fix license header --- .github/workflows/bench.yaml | 11 ----------- .github/workflows/ci.yaml | 11 ----------- .github/workflows/license.yaml | 11 ----------- 3 files changed, 33 deletions(-) diff --git a/.github/workflows/bench.yaml b/.github/workflows/bench.yaml index a00accf1..5d8cd96a 100644 --- a/.github/workflows/bench.yaml +++ b/.github/workflows/bench.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: Benchmark on: diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d2994c9..a4152c04 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: CI on: diff --git a/.github/workflows/license.yaml b/.github/workflows/license.yaml index 3f3539c2..865f8e25 100644 --- a/.github/workflows/license.yaml +++ b/.github/workflows/license.yaml @@ -12,17 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. name: License Check on: