From c91c01ed6b6d9c43566e2e5694e6130f9a58ef04 Mon Sep 17 00:00:00 2001 From: wy Date: Sat, 17 Feb 2024 22:30:35 +0800 Subject: [PATCH 01/20] Method of modifying ctx to add value --- fs/instance_impl.go | 7 ++---- fs/instance_impl_test.go | 46 ++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- 3 files changed, 49 insertions(+), 6 deletions(-) create mode 100644 fs/instance_impl_test.go diff --git a/fs/instance_impl.go b/fs/instance_impl.go index 46708e75..a021cdeb 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -23,7 +23,6 @@ import ( "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "log/slog" ) @@ -45,10 +44,8 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory { func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32) api.FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) - ctx.Value(logrus.Fields{ - "function-name": definition.Name, - "function-index": index, - }) + ctx = context.WithValue(ctx, "function-name", definition.Name) + ctx = context.WithValue(ctx, "function-index", index) return &FunctionInstanceImpl{ ctx: ctx, cancelFunc: cancelFunc, diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go new file mode 100644 index 00000000..85c1feb0 --- /dev/null +++ b/fs/instance_impl_test.go @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "github.com/functionstream/function-stream/common/model" + "testing" +) + +func TestNewFunctionInstance(t *testing.T) { + defaultInstanceFactory := DefaultInstanceFactory{} + definition := &model.Function{ + Name: "test-function", + } + index := int32(1) + instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, index) + + if instance == nil { + t.Error("FunctionInstance should not be nil") + } + + Instance := instance.(*FunctionInstanceImpl) + + if ctxValue, ok := Instance.ctx.Value("function-name").(string); !ok || ctxValue != definition.Name { + t.Errorf("Expected 'function-name' in ctx to be '%s'", definition.Name) + } + + if ctxValue, ok := Instance.ctx.Value("function-index").(int32); !ok || ctxValue != index { + t.Errorf("Expected 'function-index' in ctx to be '%d'", index) + } + +} diff --git a/go.mod b/go.mod index fbaf9996..542c8cdc 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e github.com/gorilla/mux v1.8.1 github.com/pkg/errors v0.9.1 - github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/tetratelabs/wazero v1.6.0 golang.org/x/net v0.21.0 @@ -53,6 +52,7 @@ require ( github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.11.0 // indirect From 70a8d4fa46d76aa8eca324da365179d7e0b7e581 Mon Sep 17 00:00:00 2001 From: wy Date: Sat, 17 Feb 2024 23:52:30 +0800 Subject: [PATCH 02/20] Method of modifying ctx to add value --- fs/instance_impl.go | 6 ++++-- fs/instance_impl_test.go | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/fs/instance_impl.go b/fs/instance_impl.go index a021cdeb..e671d58c 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -36,6 +36,8 @@ type FunctionInstanceImpl struct { index int32 } +type ctxKey string + type DefaultInstanceFactory struct{} func NewDefaultInstanceFactory() api.FunctionInstanceFactory { @@ -44,8 +46,8 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory { func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32) api.FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) - ctx = context.WithValue(ctx, "function-name", definition.Name) - ctx = context.WithValue(ctx, "function-index", index) + ctx = context.WithValue(ctx, ctxKey("function-name"), definition.Name) + ctx = context.WithValue(ctx, ctxKey("function-index"), index) return &FunctionInstanceImpl{ ctx: ctx, cancelFunc: cancelFunc, diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index 85c1feb0..dfa0f898 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -35,11 +35,11 @@ func TestNewFunctionInstance(t *testing.T) { Instance := instance.(*FunctionInstanceImpl) - if ctxValue, ok := Instance.ctx.Value("function-name").(string); !ok || ctxValue != definition.Name { + if ctxValue, ok := Instance.ctx.Value(ctxKey("function-name")).(string); !ok || ctxValue != definition.Name { t.Errorf("Expected 'function-name' in ctx to be '%s'", definition.Name) } - if ctxValue, ok := Instance.ctx.Value("function-index").(int32); !ok || ctxValue != index { + if ctxValue, ok := Instance.ctx.Value(ctxKey("function-index")).(int32); !ok || ctxValue != index { t.Errorf("Expected 'function-index' in ctx to be '%d'", index) } From 70e45b8699782f94f35c8e5a5e5055607fbe10d3 Mon Sep 17 00:00:00 2001 From: wy Date: Sat, 17 Feb 2024 23:54:32 +0800 Subject: [PATCH 03/20] Method of modifying ctx to add value --- fs/instance_impl.go | 6 +++--- fs/instance_impl_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fs/instance_impl.go b/fs/instance_impl.go index e671d58c..44080a58 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -36,7 +36,7 @@ type FunctionInstanceImpl struct { index int32 } -type ctxKey string +type CtxKey string type DefaultInstanceFactory struct{} @@ -46,8 +46,8 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory { func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32) api.FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) - ctx = context.WithValue(ctx, ctxKey("function-name"), definition.Name) - ctx = context.WithValue(ctx, ctxKey("function-index"), index) + ctx = context.WithValue(ctx, CtxKey("function-name"), definition.Name) + ctx = context.WithValue(ctx, CtxKey("function-index"), index) return &FunctionInstanceImpl{ ctx: ctx, cancelFunc: cancelFunc, diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index dfa0f898..e2471957 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -35,11 +35,11 @@ func TestNewFunctionInstance(t *testing.T) { Instance := instance.(*FunctionInstanceImpl) - if ctxValue, ok := Instance.ctx.Value(ctxKey("function-name")).(string); !ok || ctxValue != definition.Name { + if ctxValue, ok := Instance.ctx.Value(CtxKey("function-name")).(string); !ok || ctxValue != definition.Name { t.Errorf("Expected 'function-name' in ctx to be '%s'", definition.Name) } - if ctxValue, ok := Instance.ctx.Value(ctxKey("function-index")).(int32); !ok || ctxValue != index { + if ctxValue, ok := Instance.ctx.Value(CtxKey("function-index")).(int32); !ok || ctxValue != index { t.Errorf("Expected 'function-index' in ctx to be '%d'", index) } From a4a36ec9a8d4bff150f73e87b8f61061a344188a Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 18 Feb 2024 12:12:54 +0800 Subject: [PATCH 04/20] Remove type conversions and modify function names --- fs/instance_impl_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index e2471957..34d409bd 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -21,7 +21,7 @@ import ( "testing" ) -func TestNewFunctionInstance(t *testing.T) { +func TestFunctionInstanceContextSetting(t *testing.T) { defaultInstanceFactory := DefaultInstanceFactory{} definition := &model.Function{ Name: "test-function", @@ -33,13 +33,11 @@ func TestNewFunctionInstance(t *testing.T) { t.Error("FunctionInstance should not be nil") } - Instance := instance.(*FunctionInstanceImpl) - - if ctxValue, ok := Instance.ctx.Value(CtxKey("function-name")).(string); !ok || ctxValue != definition.Name { + if ctxValue, ok := instance.Context().Value(CtxKey("function-name")).(string); !ok || ctxValue != definition.Name { t.Errorf("Expected 'function-name' in ctx to be '%s'", definition.Name) } - if ctxValue, ok := Instance.ctx.Value(CtxKey("function-index")).(int32); !ok || ctxValue != index { + if ctxValue, ok := instance.Context().Value(CtxKey("function-index")).(int32); !ok || ctxValue != index { t.Errorf("Expected 'function-index' in ctx to be '%d'", index) } From 5b4b05540b824453785c018894832f4678c42387 Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 3 Mar 2024 19:20:27 +0800 Subject: [PATCH 05/20] fix NewSinkTube in memory --- fs/contube/memory.go | 12 ++++- fs/contube/memory_test.go | 97 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 fs/contube/memory_test.go diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 2ff42a89..b80a53e6 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -80,15 +80,19 @@ func (f *MemoryQueueFactory) release(name string) { func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { config := NewSourceQueueConfig(configMap) result := make(chan Record) + var wg sync.WaitGroup for _, topic := range config.Topics { + wg.Add(2) t := topic go func() { + defer wg.Done() <-ctx.Done() f.release(t) }() + go func() { + defer wg.Done() c := f.getOrCreateChan(t) - defer close(result) for { select { case <-ctx.Done(): @@ -99,6 +103,12 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config } }() } + + go func() { + wg.Wait() + close(result) + }() + return result, nil } diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go new file mode 100644 index 00000000..d2a23794 --- /dev/null +++ b/fs/contube/memory_test.go @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package contube + +import ( + "context" + "log" + "math/rand" + "strconv" + "sync" + "testing" + "time" +) + +func TestMemoryNewSourceTube(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + memoryQueueFactory := MemoryQueueFactory{ + ctx: ctx, + mu: sync.Mutex{}, + queues: make(map[string]*queue), + } + // Create queues, corresponding to multiple topics + queues1 := queue{ + c: make(chan Record, 1), + refCnt: 0, + } + record1 := &RecordImpl{ + payload: []byte{1}, + commitFunc: nil, + } + queues1.c <- record1 + queues2 := queue{ + c: make(chan Record, 1), + refCnt: 0, + } + record2 := &RecordImpl{ + payload: []byte{2}, + commitFunc: nil, + } + queues2.c <- record2 + queues3 := queue{ + c: make(chan Record, 1), + refCnt: 0, + } + record3 := &RecordImpl{ + payload: []byte{3}, + commitFunc: nil, + } + queues3.c <- record3 + + memoryQueueFactory.queues["topic1"] = &queues1 + memoryQueueFactory.queues["topic2"] = &queues2 + memoryQueueFactory.queues["topic3"] = &queues3 + + ch, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: []string{"topic1", "topic2", "topic3"}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + if err != nil { + t.Fatal(err) + } + var wg sync.WaitGroup + wg.Add(1) + var events []Record + go func() { + defer wg.Done() + for record := range ch { + log.Printf("Received record: %+v", record) + if record == record1 || record == record2 || record == record3 { + events = append(events, record) + } + } + }() + + time.Sleep(1 * time.Second) + cancel() + wg.Wait() + + // Create three Records data, determine whether the length of the received data is equal to 3 + // There is no need to determine whether the data in the "event" corresponds to the sent data, because the data has already been judged when it is written into the "event". + if len(events) == 3 { + t.Log("Successful") + } else { + t.Fatal("failed") + } +} From 1a8cf243ac835c9683eb9b6058b1a7c78934ac3b Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 3 Mar 2024 20:18:48 +0800 Subject: [PATCH 06/20] fix NewSinkTube in memory --- fs/contube/memory_test.go | 51 ++++++++++++--------------------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index d2a23794..a816bd3a 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -33,53 +33,34 @@ func TestMemoryNewSourceTube(t *testing.T) { mu: sync.Mutex{}, queues: make(map[string]*queue), } + // Create queues, corresponding to multiple topics - queues1 := queue{ - c: make(chan Record, 1), - refCnt: 0, - } - record1 := &RecordImpl{ - payload: []byte{1}, - commitFunc: nil, - } - queues1.c <- record1 - queues2 := queue{ - c: make(chan Record, 1), - refCnt: 0, - } - record2 := &RecordImpl{ - payload: []byte{2}, - commitFunc: nil, - } - queues2.c <- record2 - queues3 := queue{ - c: make(chan Record, 1), - refCnt: 0, - } - record3 := &RecordImpl{ - payload: []byte{3}, - commitFunc: nil, + topics := []string{"topic1", "topic2", "topic3"} + for i, v := range topics { + memoryQueueFactory.queues[v] = &queue{ + c: make(chan Record, 1), + refCnt: 0, + } + memoryQueueFactory.queues[v].c <- &RecordImpl{ + payload: []byte{byte(i)}, + commitFunc: nil, + } } - queues3.c <- record3 - - memoryQueueFactory.queues["topic1"] = &queues1 - memoryQueueFactory.queues["topic2"] = &queues2 - memoryQueueFactory.queues["topic3"] = &queues3 ch, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: []string{"topic1", "topic2", "topic3"}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { t.Fatal(err) } + var wg sync.WaitGroup wg.Add(1) var events []Record + go func() { defer wg.Done() for record := range ch { log.Printf("Received record: %+v", record) - if record == record1 || record == record2 || record == record3 { - events = append(events, record) - } + events = append(events, record) } }() @@ -87,9 +68,7 @@ func TestMemoryNewSourceTube(t *testing.T) { cancel() wg.Wait() - // Create three Records data, determine whether the length of the received data is equal to 3 - // There is no need to determine whether the data in the "event" corresponds to the sent data, because the data has already been judged when it is written into the "event". - if len(events) == 3 { + if len(events) == len(topics) { t.Log("Successful") } else { t.Fatal("failed") From 766ffafe0ed1cc6bb7accea04ea92aff6217c439 Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 3 Mar 2024 20:50:35 +0800 Subject: [PATCH 07/20] fix NewSinkTube in memory --- fs/contube/memory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index b80a53e6..73189710 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -80,6 +80,7 @@ func (f *MemoryQueueFactory) release(name string) { func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { config := NewSourceQueueConfig(configMap) result := make(chan Record) + var wg sync.WaitGroup for _, topic := range config.Topics { wg.Add(2) From 130113791654d6207fc78049321ee4bd6b6e7c23 Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 3 Mar 2024 23:01:34 +0800 Subject: [PATCH 08/20] fix NewSinkTube in memory --- fs/contube/memory.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 73189710..90be1b64 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -83,10 +83,9 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config var wg sync.WaitGroup for _, topic := range config.Topics { - wg.Add(2) + wg.Add(1) t := topic go func() { - defer wg.Done() <-ctx.Done() f.release(t) }() From 16878cfc620de6dd2a68b0236cef23b7cf9a8a64 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 11:59:12 +0800 Subject: [PATCH 09/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 56 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index a816bd3a..9bda2379 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -42,7 +42,7 @@ func TestMemoryNewSourceTube(t *testing.T) { refCnt: 0, } memoryQueueFactory.queues[v].c <- &RecordImpl{ - payload: []byte{byte(i)}, + payload: []byte{byte(i + 1)}, commitFunc: nil, } } @@ -64,7 +64,7 @@ func TestMemoryNewSourceTube(t *testing.T) { } }() - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) cancel() wg.Wait() @@ -74,3 +74,55 @@ func TestMemoryNewSourceTube(t *testing.T) { t.Fatal("failed") } } + +func TestMemoryNewSinkTube(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + memoryQueueFactory := MemoryQueueFactory{ + ctx: ctx, + mu: sync.Mutex{}, + queues: make(map[string]*queue), + } + + wrapperC, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: TopicKey}).ToConfigMap()) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + + var events []Record + var count = 10 + go func(n int) { + defer wg.Done() + for i := 0; i < n; i++ { + wrapperC <- &RecordImpl{ + payload: []byte{byte(i)}, + commitFunc: func() {}, + } + } + }(count) + + go func() { + for { + select { + case event := <-memoryQueueFactory.queues[TopicKey].c: + events = append(events, event) + case <-ctx.Done(): + return + } + } + + }() + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + if len(events) == count { + t.Log("Successful") + } else { + t.Fatal("failed") + } + +} From 80593e672a5c56bb76355b37b3bc904124079701 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 12:10:14 +0800 Subject: [PATCH 10/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/manager_test.go | 1 + 1 file changed, 1 insertion(+) create mode 100644 fs/manager_test.go diff --git a/fs/manager_test.go b/fs/manager_test.go new file mode 100644 index 00000000..ee666873 --- /dev/null +++ b/fs/manager_test.go @@ -0,0 +1 @@ +package fs From c709c8b324e03aa6b92f5b6fb606e4e8efba47c9 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 12:10:24 +0800 Subject: [PATCH 11/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 9bda2379..7ce25b3a 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -103,6 +103,8 @@ func TestMemoryNewSinkTube(t *testing.T) { } }(count) + wg.Wait() + go func() { for { select { @@ -117,7 +119,6 @@ func TestMemoryNewSinkTube(t *testing.T) { time.Sleep(100 * time.Millisecond) cancel() - wg.Wait() if len(events) == count { t.Log("Successful") From 9960d207e8c787a0fa7b643afac97c26ed65691f Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 12:10:42 +0800 Subject: [PATCH 12/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/manager_test.go | 1 - 1 file changed, 1 deletion(-) delete mode 100644 fs/manager_test.go diff --git a/fs/manager_test.go b/fs/manager_test.go deleted file mode 100644 index ee666873..00000000 --- a/fs/manager_test.go +++ /dev/null @@ -1 +0,0 @@ -package fs From c28d2289bdc8414e28d52c0fbd038f9d3730a7f3 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 12:29:25 +0800 Subject: [PATCH 13/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 7ce25b3a..c67d547c 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -103,8 +103,6 @@ func TestMemoryNewSinkTube(t *testing.T) { } }(count) - wg.Wait() - go func() { for { select { @@ -117,8 +115,9 @@ func TestMemoryNewSinkTube(t *testing.T) { }() - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) cancel() + wg.Wait() if len(events) == count { t.Log("Successful") From 0a7988e812e607159dc21d919edaa56d65469fa9 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 4 Mar 2024 14:41:10 +0800 Subject: [PATCH 14/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index c67d547c..487e786b 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -89,6 +89,7 @@ func TestMemoryNewSinkTube(t *testing.T) { } var wg sync.WaitGroup + var mu sync.Mutex wg.Add(1) var events []Record @@ -103,11 +104,17 @@ func TestMemoryNewSinkTube(t *testing.T) { } }(count) + wg.Wait() + go func() { + memoryQueueFactory.mu.Lock() + defer memoryQueueFactory.mu.Unlock() for { select { case event := <-memoryQueueFactory.queues[TopicKey].c: + mu.Lock() events = append(events, event) + mu.Unlock() case <-ctx.Done(): return } @@ -117,7 +124,9 @@ func TestMemoryNewSinkTube(t *testing.T) { time.Sleep(500 * time.Millisecond) cancel() - wg.Wait() + + mu.Lock() + defer mu.Unlock() if len(events) == count { t.Log("Successful") From bee573cff3d1bd3a3d512cdafa7190c05b2216e4 Mon Sep 17 00:00:00 2001 From: wy Date: Wed, 6 Mar 2024 11:15:57 +0800 Subject: [PATCH 15/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory.go | 9 ++-- fs/contube/memory_test.go | 104 +++++++++++--------------------------- 2 files changed, 34 insertions(+), 79 deletions(-) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 90be1b64..708b1e70 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -84,13 +84,12 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config var wg sync.WaitGroup for _, topic := range config.Topics { wg.Add(1) - t := topic - go func() { + go func(t string) { <-ctx.Done() f.release(t) - }() + }(topic) - go func() { + go func(t string) { defer wg.Done() c := f.getOrCreateChan(t) for { @@ -101,7 +100,7 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config result <- event } } - }() + }(topic) } go func() { diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 487e786b..e225b8e2 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -18,7 +18,6 @@ package contube import ( "context" - "log" "math/rand" "strconv" "sync" @@ -26,7 +25,8 @@ import ( "time" ) -func TestMemoryNewSourceTube(t *testing.T) { +func TestMemoryTube(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) memoryQueueFactory := MemoryQueueFactory{ ctx: ctx, @@ -34,101 +34,57 @@ func TestMemoryNewSourceTube(t *testing.T) { queues: make(map[string]*queue), } - // Create queues, corresponding to multiple topics topics := []string{"topic1", "topic2", "topic3"} - for i, v := range topics { - memoryQueueFactory.queues[v] = &queue{ - c: make(chan Record, 1), - refCnt: 0, - } - memoryQueueFactory.queues[v].c <- &RecordImpl{ - payload: []byte{byte(i + 1)}, - commitFunc: nil, - } - } - - ch, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: []string{"topic1", "topic2", "topic3"}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) - if err != nil { - t.Fatal(err) - } var wg sync.WaitGroup - wg.Add(1) var events []Record + var sinks []chan<- Record - go func() { - defer wg.Done() - for record := range ch { - log.Printf("Received record: %+v", record) - events = append(events, record) - } - }() - - time.Sleep(100 * time.Millisecond) - cancel() - wg.Wait() - - if len(events) == len(topics) { - t.Log("Successful") - } else { - t.Fatal("failed") - } -} - -func TestMemoryNewSinkTube(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - memoryQueueFactory := MemoryQueueFactory{ - ctx: ctx, - mu: sync.Mutex{}, - queues: make(map[string]*queue), - } - - wrapperC, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: TopicKey}).ToConfigMap()) + source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { t.Fatal(err) } - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(1) - - var events []Record - var count = 10 - go func(n int) { - defer wg.Done() - for i := 0; i < n; i++ { - wrapperC <- &RecordImpl{ - payload: []byte{byte(i)}, - commitFunc: func() {}, - } + for i, v := range topics { + wg.Add(1) + sink, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: v}).ToConfigMap()) + sinks = append(sinks, sink) + if err != nil { + t.Fatal(err) } - }(count) - - wg.Wait() + go func(i int) { + defer wg.Done() + defer close(sink) + sink <- NewRecordImpl([]byte{byte(i + 1)}, func() {}) + }(i) + } + wg.Add(1) go func() { - memoryQueueFactory.mu.Lock() - defer memoryQueueFactory.mu.Unlock() + defer wg.Done() for { select { - case event := <-memoryQueueFactory.queues[TopicKey].c: - mu.Lock() - events = append(events, event) - mu.Unlock() case <-ctx.Done(): return + case event := <-source: + events = append(events, event) } } - }() - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // Make sure all goroutines are running cancel() + time.Sleep(100 * time.Millisecond) // This time.Sleep is to wait for cancel() to notify all goroutines + wg.Wait() - mu.Lock() - defer mu.Unlock() + for _, topic := range topics { + _, ok := memoryQueueFactory.queues[topic] + if ok { + t.Fatal("queue release failure") + } + } - if len(events) == count { + if len(events) == len(topics) { t.Log("Successful") } else { t.Fatal("failed") From c61f8e515d2614352deb0c6be8733a84ae7924e6 Mon Sep 17 00:00:00 2001 From: wy Date: Wed, 6 Mar 2024 11:40:29 +0800 Subject: [PATCH 16/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index e225b8e2..6e5d2bb4 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -38,17 +38,14 @@ func TestMemoryTube(t *testing.T) { var wg sync.WaitGroup var events []Record - var sinks []chan<- Record source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { t.Fatal(err) } - for i, v := range topics { wg.Add(1) sink, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: v}).ToConfigMap()) - sinks = append(sinks, sink) if err != nil { t.Fatal(err) } @@ -77,11 +74,15 @@ func TestMemoryTube(t *testing.T) { time.Sleep(100 * time.Millisecond) // This time.Sleep is to wait for cancel() to notify all goroutines wg.Wait() + // Add "sync. Mutex" to prevent multiple goroutines from accessing queues simultaneously for _, topic := range topics { + memoryQueueFactory.mu.Lock() _, ok := memoryQueueFactory.queues[topic] + memoryQueueFactory.mu.Unlock() if ok { t.Fatal("queue release failure") } + } if len(events) == len(topics) { From ccb6f81421ce2532a61cf12017dd82ec4170f97a Mon Sep 17 00:00:00 2001 From: wy Date: Wed, 6 Mar 2024 11:44:08 +0800 Subject: [PATCH 17/20] fix NewSinkTube in memory, add unit test for NewSourceTube and NewSinkTube. --- fs/contube/memory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 6e5d2bb4..41e1ad55 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -86,7 +86,7 @@ func TestMemoryTube(t *testing.T) { } if len(events) == len(topics) { - t.Log("Successful") + t.Log("successful") } else { t.Fatal("failed") } From 3a3c08f0f87bea78d5282e5b964f4dd8e05948b9 Mon Sep 17 00:00:00 2001 From: wy Date: Sat, 9 Mar 2024 17:16:53 +0800 Subject: [PATCH 18/20] fix NewSinkTube in memory, add unit test --- fs/contube/memory.go | 10 ++++++---- fs/contube/memory_test.go | 31 ++++++++++++++----------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 708b1e70..655e8e00 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -37,6 +37,7 @@ type MemoryQueueFactory struct { func NewMemoryQueueFactory(ctx context.Context) TubeFactory { return &MemoryQueueFactory{ ctx: ctx, + mu: sync.Mutex{}, queues: make(map[string]*queue), } } @@ -83,13 +84,14 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config var wg sync.WaitGroup for _, topic := range config.Topics { + t := topic wg.Add(1) - go func(t string) { + go func() { <-ctx.Done() f.release(t) - }(topic) + }() - go func(t string) { + go func() { defer wg.Done() c := f.getOrCreateChan(t) for { @@ -100,7 +102,7 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config result <- event } } - }(topic) + }() } go func() { diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 41e1ad55..334a55ab 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -28,21 +28,18 @@ import ( func TestMemoryTube(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - memoryQueueFactory := MemoryQueueFactory{ - ctx: ctx, - mu: sync.Mutex{}, - queues: make(map[string]*queue), - } - - topics := []string{"topic1", "topic2", "topic3"} + tubeFactory := NewMemoryQueueFactory(ctx) + memoryQueueFactory := tubeFactory.(*MemoryQueueFactory) var wg sync.WaitGroup var events []Record + topics := []string{"topic1", "topic2", "topic3"} source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { t.Fatal(err) } + for i, v := range topics { wg.Add(1) sink, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: v}).ToConfigMap()) @@ -69,22 +66,22 @@ func TestMemoryTube(t *testing.T) { } }() - time.Sleep(100 * time.Millisecond) // Make sure all goroutines are running + // The waiting event already contains enough results. + time.Sleep(100 * time.Millisecond) cancel() - time.Sleep(100 * time.Millisecond) // This time.Sleep is to wait for cancel() to notify all goroutines wg.Wait() - // Add "sync. Mutex" to prevent multiple goroutines from accessing queues simultaneously - for _, topic := range topics { - memoryQueueFactory.mu.Lock() - _, ok := memoryQueueFactory.queues[topic] - memoryQueueFactory.mu.Unlock() - if ok { - t.Fatal("queue release failure") - } + // Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful. + time.Sleep(500 * time.Millisecond) + // assert the memoryQueueFactory.queues is empty. + memoryQueueFactory.mu.Lock() + if len(memoryQueueFactory.queues) != 0 { + t.Fatal("MemoryQueueFactory.queues is not empty") } + memoryQueueFactory.mu.Unlock() + // Assert if the number of received events equals the number of topic. if len(events) == len(topics) { t.Log("successful") } else { From 5a150184cdc0ec4c13156ce1e70a9288975da708 Mon Sep 17 00:00:00 2001 From: wy Date: Sun, 10 Mar 2024 21:39:21 +0800 Subject: [PATCH 19/20] fix NewSinkTube in memory, add unit test --- fs/contube/memory_test.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 334a55ab..138e16f1 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -58,21 +58,22 @@ func TestMemoryTube(t *testing.T) { defer wg.Done() for { select { - case <-ctx.Done(): - return case event := <-source: events = append(events, event) + if len(events) == len(topics) { + return + } + default: + continue } } }() - // The waiting event already contains enough results. - time.Sleep(100 * time.Millisecond) - cancel() wg.Wait() + cancel() // Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful. - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // assert the memoryQueueFactory.queues is empty. memoryQueueFactory.mu.Lock() @@ -81,11 +82,4 @@ func TestMemoryTube(t *testing.T) { } memoryQueueFactory.mu.Unlock() - // Assert if the number of received events equals the number of topic. - if len(events) == len(topics) { - t.Log("successful") - } else { - t.Fatal("failed") - } - } From 2f5ef13d6cfed89dd4ec4b0c93c456a9cd67d7c5 Mon Sep 17 00:00:00 2001 From: wy Date: Mon, 11 Mar 2024 21:02:27 +0800 Subject: [PATCH 20/20] fix NewSinkTube in memory, add unit test --- fs/contube/memory.go | 1 - 1 file changed, 1 deletion(-) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 655e8e00..e5886f53 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -37,7 +37,6 @@ type MemoryQueueFactory struct { func NewMemoryQueueFactory(ctx context.Context) TubeFactory { return &MemoryQueueFactory{ ctx: ctx, - mu: sync.Mutex{}, queues: make(map[string]*queue), } }