Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
const (
PulsarTubeType = "pulsar"
MemoryTubeType = "memory"
HttpTubeType = "http"

DefaultAddr = "localhost:7300"
DefaultPulsarURL = "pulsar://localhost:6650"
Expand Down
9 changes: 7 additions & 2 deletions fs/contube/contube.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package contube

import (
"context"
"github.com/pkg/errors"
)

var (
ErrSinkTubeNotImplemented = errors.New("sink tube not implemented")
)

type Record interface {
Expand Down Expand Up @@ -74,8 +79,8 @@ func (c *SinkQueueConfig) ToConfigMap() ConfigMap {

type ConfigMap map[string]interface{}

// Merge merges multiple ConfigMap into one
func Merge(configs ...ConfigMap) ConfigMap {
// MergeConfig merges multiple ConfigMap into one
func MergeConfig(configs ...ConfigMap) ConfigMap {
result := ConfigMap{}
for _, config := range configs {
for k, v := range config {
Expand Down
160 changes: 160 additions & 0 deletions fs/contube/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package contube

import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"io"
"log/slog"
"net/http"
"sync"
"sync/atomic"
)

type state int

const (
EndpointKey = "endpoint"

stateReady state = iota
stateClosed state = iota
)

var (
ErrEndpointNotFound = errors.New("endpoint not found")
ErrEndpointClosed = errors.New("endpoint closed")
ErrorEndpointAlreadyExists = errors.New("endpoint already exists")
)

type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error

type endpointHandler struct {
ctx context.Context
s atomic.Value
c chan Record
}

type HttpTubeFactory struct {
TubeFactory
ctx context.Context
mu sync.RWMutex
endpoints map[string]*endpointHandler
}

func NewHttpTubeFactory(ctx context.Context) TubeFactory {
return &HttpTubeFactory{
ctx: ctx,
endpoints: make(map[string]*endpointHandler),
}
}

type httpSourceTubeConfig struct {
endpoint string
}

func (c ConfigMap) toHttpSourceTubeConfig() (*httpSourceTubeConfig, error) {
endpoint, ok := c[EndpointKey].(string)
if !ok {
return nil, ErrEndpointNotFound
}
return &httpSourceTubeConfig{
endpoint: endpoint,
}, nil
}

func (f *HttpTubeFactory) Handle(ctx context.Context, endpoint string, payload []byte) error {
f.mu.RLock()
e, ok := f.endpoints[endpoint]
if !ok {
f.mu.RUnlock()
return ErrEndpointNotFound
}
f.mu.RUnlock()
if e.s.Load() == stateClosed {
return ErrEndpointClosed
}
select {
case e.c <- NewRecordImpl(payload, func() {}):
return nil
case <-ctx.Done():
return ctx.Err()
case <-e.ctx.Done():
return ErrEndpointClosed
}
}

func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error) {
c, err := config.toHttpSourceTubeConfig()
if err != nil {
return nil, err
}
result := make(chan Record, 10)
f.mu.Lock()
defer f.mu.Unlock()
if _, ok := f.endpoints[c.endpoint]; ok {
return nil, ErrorEndpointAlreadyExists
}
var s atomic.Value
s.Store(stateReady)
handlerCtx, cancel := context.WithCancel(f.ctx)
e := &endpointHandler{
c: result,
s: s,
ctx: handlerCtx,
}
f.endpoints[c.endpoint] = e
go func() {
<-ctx.Done()
cancel()
close(result)
f.mu.Lock()
defer f.mu.Unlock()
delete(f.endpoints, c.endpoint)
}()
return result, nil
}

func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Record, error) {
return nil, ErrSinkTubeNotImplemented
}

func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
endpoint, err := getEndpoint(r)
if err != nil {
logger.Error("Failed to get endpoint", "error", err)
http.Error(w, errors.Wrap(err, "Failed to get endpoint").Error(), http.StatusBadRequest)
return
}
log := logger.With(slog.String("endpoint", endpoint), slog.String("component", "http-tube"))
log.Info("Handle records from http request")
content, err := io.ReadAll(r.Body)
if err != nil {
log.Error("Failed to read body", "error", err)
http.Error(w, errors.Wrap(err, "Failed to read body").Error(), http.StatusBadRequest)
return
}
err = f.Handle(r.Context(), endpoint, content)
if err != nil {
log.Error("Failed to handle record", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Info("Handled records from http request")
}
}
57 changes: 57 additions & 0 deletions fs/contube/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package contube

import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"testing"
)

func TestHttpTubeHandleRecord(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
f := NewHttpTubeFactory(ctx).(*HttpTubeFactory)

endpoint := "test"
err := f.Handle(ctx, "test", []byte("test"))
assert.ErrorIs(t, err, ErrEndpointNotFound)

config := make(ConfigMap)
config[EndpointKey] = endpoint
source, err := f.NewSourceTube(ctx, config)
assert.NoError(t, err)
_, err = f.NewSourceTube(ctx, config)
assert.ErrorIs(t, err, ErrorEndpointAlreadyExists)

err = f.Handle(ctx, endpoint, []byte("test"))
assert.Nil(t, err)

record := <-source
assert.Equal(t, "test", string(record.GetPayload()))

cancel()

assert.Nil(t, <-source)
err = f.Handle(ctx, endpoint, []byte("test"))
assert.Error(t, err, ErrEndpointNotFound)
}

func TestHttpTubeSinkTubeNotImplement(t *testing.T) {
f := NewHttpTubeFactory(context.Background()).(*HttpTubeFactory)
_, err := f.NewSinkTube(context.Background(), make(ConfigMap))
assert.ErrorIs(t, err, ErrSinkTubeNotImplemented)
}
2 changes: 1 addition & 1 deletion fs/instance_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFact
}
getTubeConfig := func(config contube.ConfigMap, tubeConfig *model.TubeConfig) contube.ConfigMap {
if tubeConfig != nil && tubeConfig.Config != nil {
return contube.Merge(config, tubeConfig.Config)
return contube.MergeConfig(config, tubeConfig.Config)
}
return config
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
github.com/tetratelabs/wazero v1.6.0
golang.org/x/net v0.21.0
golang.org/x/time v0.5.0
Expand All @@ -26,6 +27,7 @@ require (
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/danieljoos/wincred v1.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/frankban/quicktest v1.14.6 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand All @@ -47,6 +49,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
Expand Down
35 changes: 30 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"sync/atomic"
Expand All @@ -43,7 +44,9 @@ type Server struct {
}

type serverOptions struct {
manager *fs.FunctionManager
httpListener net.Listener
manager *fs.FunctionManager
httpTube *contube.HttpTubeFactory
}

type ServerOption interface {
Expand All @@ -63,7 +66,16 @@ func WithFunctionManager(manager *fs.FunctionManager) ServerOption {
})
}

func newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, error) {
// WithHttpListener sets the listener for the HTTP server.
// If not set, the server will listen on the Config.ListenAddr.
func WithHttpListener(listener net.Listener) ServerOption {
return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) {
o.httpListener = listener
return o, nil
})
}

func (s *serverOptions) newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, error) {
var tubeFactory contube.TubeFactory
var err error
switch config.TubeType {
Expand All @@ -77,9 +89,10 @@ func newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, erro
case common.MemoryTubeType:
tubeFactory = contube.NewMemoryQueueFactory(context.Background())
}

s.httpTube = contube.NewHttpTubeFactory(context.Background()).(*contube.HttpTubeFactory)
manager, err := fs.NewFunctionManager(
fs.WithDefaultTubeFactory(tubeFactory),
fs.WithTubeFactory("http", s.httpTube),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create default function manager")
Expand All @@ -96,7 +109,7 @@ func NewServer(config *common.Config, opts ...ServerOption) (*Server, error) {
}
}
if options.manager == nil {
manager, err := newDefaultFunctionManager(config)
manager, err := options.newDefaultFunctionManager(config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -244,13 +257,25 @@ func (s *Server) startRESTHandlers() error {
log.Info("Consumed event from queue")
}).Methods("GET")

r.HandleFunc("/api/v1/http-tube/{endpoint}", s.options.httpTube.GetHandleFunc(func(r *http.Request) (string, error) {
e, ok := mux.Vars(r)["endpoint"]
if !ok {
return "", errors.New("endpoint not found")
}
return e, nil
}, s.log)).Methods("POST")

httpSvr := &http.Server{
Addr: s.config.ListenAddr,
Handler: r,
}
s.httpSvr.Store(httpSvr)

return httpSvr.ListenAndServe()
if s.options.httpListener != nil {
return httpSvr.Serve(s.options.httpListener)
} else {
return httpSvr.ListenAndServe()
}
}

func (s *Server) WaitForReady(ctx context.Context) <-chan struct{} {
Expand Down
Loading