From 54fc3f02f3d2228a93faa5cafacfb38fd75b0ee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 26 Jun 2025 19:17:57 +0200 Subject: [PATCH 1/2] implement reactor processor, refactor wasm package --- run.go | 139 ++++++++------------------------ wasm/caller.go | 57 -------------- wasm/caller_test.go | 82 ------------------- wasm/command.go | 64 --------------- wasm/exports.go | 65 +++++++++++++++ wasm/imports.go | 24 ------ wasm/init.go | 47 +++++++++++ wasm/schema.go | 34 +------- wasm/util.go | 153 +++++++++++++++++++++++++++++++----- wasm/util_test.go | 187 ++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 471 insertions(+), 381 deletions(-) delete mode 100644 wasm/caller.go delete mode 100644 wasm/caller_test.go delete mode 100644 wasm/command.go create mode 100644 wasm/exports.go create mode 100644 wasm/init.go create mode 100644 wasm/util_test.go diff --git a/run.go b/run.go index 98207c5..45d48db 100644 --- a/run.go +++ b/run.go @@ -31,26 +31,27 @@ import ( "github.com/rs/zerolog" ) -// Run is the entry-point for a standalone processor. It handles all communication -// with Conduit. It will block forever, or until an error occurs. If an error -// occurs, it will be printed to stderr and the process will exit with a non-zero -// exit code. Otherwise, it will exit with a zero exit code. +// Run is the entry-point for a standalone processor. It will set up the handler +// for the exported functions and wrap the provided Processor with middleware. // -// A processor plugin needs to call this function in its main function. The +// A processor plugin needs to call this function in its init function. The // entrypoint file should look like this: // -// //go:build wasm +// //go:build wasm // -// package main +// package main // -// import ( -// sdk "github.com/conduitio/conduit-processor-sdk" -// ) +// import ( +// sdk "github.com/conduitio/conduit-processor-sdk" +// ) // -// func main() { -// processor := NewMyProcessor() -// sdk.Run(processor) -// } +// func init() { +// processor := NewMyProcessor() +// sdk.Run(processor) +// } +// +// // Main is required by the Go compiler, but it is not executed. +// func main() {} func Run(p Processor) { checkMagicCookie() @@ -64,7 +65,6 @@ func Run(p Processor) { } ctx = context.Background() - cmd processorv1.CommandRequest ) wasm.InitUtils(env.logLevel) @@ -84,27 +84,10 @@ func Run(p Processor) { executor := commandExecutor{ protoconv: protoConverter{}, logger: logger, + p: p, } - for { - logger.Trace().Msg("retrieving next command") - cmd.Reset() - err := wasm.NextCommand(&cmd) - if err != nil { - if errors.Is(err, pprocutils.ErrNoMoreCommands) { - os.Exit(0) - } - _, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err) - os.Exit(1) - } - - resp := executor.Execute(ctx, p, &cmd) - err = wasm.Reply(resp) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "failed writing reply: %v\n", err) - os.Exit(1) - } - } + wasm.Handler = executor } func checkMagicCookie() { @@ -129,109 +112,55 @@ func checkMagicCookie() { type commandExecutor struct { protoconv protoConverter logger *zerolog.Logger + p Processor } -// Execute executes the given command request. It returns a command response -// that will be sent back to Conduit. -func (e commandExecutor) Execute(ctx context.Context, p Processor, cmdReq *processorv1.CommandRequest) *processorv1.CommandResponse { - e.logger.Trace().Type("command", cmdReq.GetRequest()).Msg("executing command") - - var resp *processorv1.CommandResponse - var err error - - switch req := cmdReq.GetRequest().(type) { - case *processorv1.CommandRequest_Specify: - resp, err = e.executeSpecify(ctx, p, req.Specify) - case *processorv1.CommandRequest_Configure: - resp, err = e.executeConfigure(ctx, p, req.Configure) - case *processorv1.CommandRequest_Open: - resp, err = e.executeOpen(ctx, p, req.Open) - case *processorv1.CommandRequest_Process: - resp, err = e.executeProcess(ctx, p, req.Process) - case *processorv1.CommandRequest_Teardown: - resp, err = e.executeTeardown(ctx, p, req.Teardown) - default: - err = pprocutils.ErrUnknownCommandRequest - } - - if err != nil { - e.logger.Trace().Err(err).Msg("command returned an error") - resp = &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Error{ - Error: e.protoconv.error(err), - }, - } - } - - return resp -} - -func (e commandExecutor) executeSpecify(_ context.Context, p Processor, _ *processorv1.Specify_Request) (*processorv1.CommandResponse, error) { - spec, err := p.Specification() +func (e commandExecutor) Specification(_ *processorv1.Specify_Request) (*processorv1.Specify_Response, error) { + spec, err := e.p.Specification() if err != nil { return nil, err } - return &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Specify{ - Specify: e.protoconv.specifyResponse(spec), - }, - }, nil + return e.protoconv.specifyResponse(spec), nil } -func (e commandExecutor) executeConfigure(ctx context.Context, p Processor, req *processorv1.Configure_Request) (*processorv1.CommandResponse, error) { - err := p.Configure(ctx, req.Parameters) +func (e commandExecutor) Configure(_ *processorv1.Configure_Request) (*processorv1.Configure_Response, error) { + err := e.p.Configure(context.Background(), nil) if err != nil { return nil, err } - return &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Configure{ - Configure: &processorv1.Configure_Response{}, - }, - }, nil + return &processorv1.Configure_Response{}, nil } -func (e commandExecutor) executeOpen(ctx context.Context, p Processor, _ *processorv1.Open_Request) (*processorv1.CommandResponse, error) { - err := p.Open(ctx) +func (e commandExecutor) Open(_ *processorv1.Open_Request) (*processorv1.Open_Response, error) { + err := e.p.Open(context.Background()) if err != nil { return nil, err } - return &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Open{ - Open: &processorv1.Open_Response{}, - }, - }, nil + return &processorv1.Open_Response{}, nil } -func (e commandExecutor) executeProcess(ctx context.Context, p Processor, req *processorv1.Process_Request) (*processorv1.CommandResponse, error) { +func (e commandExecutor) Process(req *processorv1.Process_Request) (*processorv1.Process_Response, error) { records, err := e.protoconv.records(req.Records) if err != nil { return nil, fmt.Errorf("failed to convert proto opencdc records: %w", err) } - processedRecords := p.Process(ctx, records) + processedRecords := e.p.Process(context.Background(), records) protoRecords, err := e.protoconv.processedRecords(processedRecords) if err != nil { return nil, fmt.Errorf("failed to convert processed records: %w", err) } - return &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Process{ - Process: &processorv1.Process_Response{ - Records: protoRecords, - }, - }, + return &processorv1.Process_Response{ + Records: protoRecords, }, nil } -func (e commandExecutor) executeTeardown(ctx context.Context, p Processor, _ *processorv1.Teardown_Request) (*processorv1.CommandResponse, error) { - err := p.Teardown(ctx) +func (e commandExecutor) Teardown(_ *processorv1.Teardown_Request) (*processorv1.Teardown_Response, error) { + err := e.p.Teardown(context.Background()) if err != nil { return nil, err } - return &processorv1.CommandResponse{ - Response: &processorv1.CommandResponse_Teardown{ - Teardown: &processorv1.Teardown_Response{}, - }, - }, nil + return &processorv1.Teardown_Response{}, nil } // protoConverter converts between the SDK and protobuf types. diff --git a/wasm/caller.go b/wasm/caller.go deleted file mode 100644 index 9ac897b..0000000 --- a/wasm/caller.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package wasm - -import ( - "unsafe" - - "github.com/conduitio/conduit-processor-sdk/pprocutils" -) - -// HostFunc is the function type for the imported functions from the host. -// -// The arguments are: -// (1) a pointer to the address where the command should be written -// (2) the size of allocated memory. -// -// The return value indicates the size of the allocated response in bytes. If the -// response is larger than the allocated memory, the caller should reallocate the -// memory and call the function again. -type HostFunc func(ptr unsafe.Pointer, size uint32) uint32 - -// Call calls the function from the host 2 times max, is the buffer size is not -// enough the first time its called, it will be resized the second call. -// returns the buffer, command size, and error. -func hostCall(fn HostFunc, buf []byte) ([]byte, uint32, error) { - // 2 tries, 1st try is with the current buffer size, if that's not enough, - // then resize the buffer and try again - for i := 0; i < 2; i++ { - // request the host to write the response to the given buffer address - ptr := unsafe.Pointer(&buf[0]) - cmdSize := fn(ptr, uint32(len(buf))) //nolint:gosec // no risk of overflow - switch { - case cmdSize >= pprocutils.ErrorCodeStart: - // error codes - return nil, cmdSize, pprocutils.NewErrorFromCode(cmdSize) - case cmdSize > uint32(len(buf)) && i == 0: //nolint:gosec // no risk of overflow - // not enough memory, resize the buffer and try again - oldSize := uint32(len(buf)) //nolint:gosec // no risk of overflow - buf = append(buf, make([]byte, cmdSize-oldSize)...) - continue // try again - } - return buf, cmdSize, nil - } - panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") -} diff --git a/wasm/caller_test.go b/wasm/caller_test.go deleted file mode 100644 index 5db4c64..0000000 --- a/wasm/caller_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package wasm - -import ( - "testing" - "unsafe" - - "github.com/conduitio/conduit-processor-sdk/pprocutils" - "github.com/matryer/is" -) - -func TestHostCall(t *testing.T) { - tests := []struct { - name string - hostFunc HostFunc - buf []byte - expectedBuf []byte - expectedCmdSize uint32 - expectedError error - }{ - { - name: "buffer large enough on first call", - hostFunc: func(ptr unsafe.Pointer, size uint32) uint32 { - buf := unsafe.Slice((*byte)(ptr), size) - copy(buf, "command") - return uint32(len("command")) - }, - buf: make([]byte, 10), - expectedBuf: []byte("command\x00\x00\x00"), - expectedCmdSize: 7, - expectedError: nil, - }, - { - name: "buffer not large enough on first call, resized on second", - hostFunc: func(ptr unsafe.Pointer, size uint32) uint32 { - if size < 7 { - return 7 - } - buf := unsafe.Slice((*byte)(ptr), size) - copy(buf, "command") - return uint32(len("command")) - }, - buf: make([]byte, 3), - expectedBuf: []byte("command"), - expectedCmdSize: 7, - expectedError: nil, - }, - { - name: "host returns error", - hostFunc: func(_ unsafe.Pointer, _ uint32) uint32 { - return pprocutils.ErrorCodeInternal - }, - buf: make([]byte, 10), - expectedBuf: nil, - expectedCmdSize: pprocutils.ErrorCodeInternal, - expectedError: pprocutils.NewErrorFromCode(pprocutils.ErrorCodeInternal), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - is := is.New(t) - buf, cmdSize, err := hostCall(tt.hostFunc, tt.buf) - is.Equal(tt.expectedError, err) - is.Equal(tt.expectedBuf, buf) - is.Equal(tt.expectedCmdSize, cmdSize) - }) - } -} diff --git a/wasm/command.go b/wasm/command.go deleted file mode 100644 index cd32ef1..0000000 --- a/wasm/command.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright © 2023 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build wasm - -package wasm - -import ( - "fmt" - "sync" - - processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" - "google.golang.org/protobuf/proto" -) - -const defaultBufferSize = 1024 // 1kB - -var bufferPool = sync.Pool{ - New: func() any { - return make([]byte, defaultBufferSize) - }, -} - -func NextCommand(cmdReq *processorv1.CommandRequest) error { - buffer := bufferPool.Get().([]byte) - defer func() { - bufferPool.Put(buffer) - }() - - buffer, cmdSize, err := hostCall(_commandRequest, buffer[:cap(buffer)]) - if err != nil { - return err - } - // parse the command - if err := proto.Unmarshal(buffer[:cmdSize], cmdReq); err != nil { - return fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return nil -} - -func Reply(resp *processorv1.CommandResponse) error { - buffer := bufferPool.Get().([]byte) - defer func() { - bufferPool.Put(buffer) - }() - - buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], resp) - if err != nil { - return fmt.Errorf("failed marshalling proto type into bytes: %w", err) - } - buffer, _, err = hostCall(_commandResponse, buffer) - return err -} diff --git a/wasm/exports.go b/wasm/exports.go new file mode 100644 index 0000000..8eacd0b --- /dev/null +++ b/wasm/exports.go @@ -0,0 +1,65 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package wasm + +import ( + "unsafe" + + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" +) + +//go:wasmexport conduit.processor.v1.malloc +func malloc(size uint32) unsafe.Pointer { + // Allocate a buffer of the specified size. + exportReqBuffer.Grow(int(size)) + return exportReqBuffer.Pointer() +} + +//go:wasmexport conduit.processor.v1.specification +func specification(ptr unsafe.Pointer, size uint32) uint64 { + return handleExportedCall(ptr, size, Handler.Specification, &processorv1.Specify_Request{}) +} + +//go:wasmexport conduit.processor.v1.configure +func configure(ptr unsafe.Pointer, size uint32) uint64 { + return handleExportedCall(ptr, size, Handler.Configure, &processorv1.Configure_Request{}) +} + +//go:wasmexport conduit.processor.v1.open +func open(ptr unsafe.Pointer, size uint32) uint64 { + return handleExportedCall(ptr, size, Handler.Open, &processorv1.Open_Request{}) +} + +//go:wasmexport conduit.processor.v1.process +func process(ptr unsafe.Pointer, size uint32) uint64 { + // TODO reuse the same request object and reset it instead of creating a new one each time + return handleExportedCall(ptr, size, Handler.Process, &processorv1.Process_Request{}) +} + +//go:wasmexport conduit.processor.v1.teardown +func teardown(ptr unsafe.Pointer, size uint32) uint64 { + return handleExportedCall(ptr, size, Handler.Teardown, &processorv1.Teardown_Request{}) +} + +// Handler is the bridge between the WebAssembly exports and the processor SDK. +var Handler interface { + Specification(*processorv1.Specify_Request) (*processorv1.Specify_Response, error) + Configure(*processorv1.Configure_Request) (*processorv1.Configure_Response, error) + Open(*processorv1.Open_Request) (*processorv1.Open_Response, error) + Process(*processorv1.Process_Request) (*processorv1.Process_Response, error) + Teardown(*processorv1.Teardown_Request) (*processorv1.Teardown_Response, error) +} diff --git a/wasm/imports.go b/wasm/imports.go index 3f25b95..b9e5584 100644 --- a/wasm/imports.go +++ b/wasm/imports.go @@ -18,30 +18,6 @@ package wasm import "unsafe" -// Imports `command_request` from the host, which retrieves -// the next command for a processor. -// -// The arguments are: -// (1) a pointer to the address where the command should be written -// (2) the size of allocated memory. -// -// The return value indicates the size of the allocated request in bytes. If the -// command is larger than the allocated memory, the caller should reallocate the -// memory and call `command_request` again. -// -//go:wasmimport conduit command_request -func _commandRequest(ptr unsafe.Pointer, size uint32) uint32 - -// Imports `command_response` from the host, which informs -// the host about the response for the previous command. -// -// The arguments are: -// (1) a pointer to the address where the reply was written -// (2) the size of allocated memory. -// -//go:wasmimport conduit command_response -func _commandResponse(ptr unsafe.Pointer, size uint32) uint32 - // Imports `create_schema` from the host, which creates a schema // // The arguments are: diff --git a/wasm/init.go b/wasm/init.go new file mode 100644 index 0000000..8fa8fb1 --- /dev/null +++ b/wasm/init.go @@ -0,0 +1,47 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package wasm + +import ( + "os" + + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/conduitio/conduit-processor-sdk/schema" + "github.com/rs/zerolog" +) + +func InitUtils(logLevel string) { + initLogger(logLevel) + initSchemaService() +} + +func initLogger(logLevel string) { + logger := zerolog.New(os.Stdout) + + level, err := zerolog.ParseLevel(logLevel) + if err != nil { + logger.Warn().Err(err).Msg("failed to parse log level, falling back to debug") + // fallback to debug level + level = zerolog.DebugLevel + } + logger = logger.Level(level) + pprocutils.Logger = logger +} + +func initSchemaService() { + schema.SchemaService = &schemaService{} +} diff --git a/wasm/schema.go b/wasm/schema.go index 28ac4ee..f1e009e 100644 --- a/wasm/schema.go +++ b/wasm/schema.go @@ -24,56 +24,30 @@ import ( "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/fromproto" "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/toproto" procutilsv1 "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1" - "google.golang.org/protobuf/proto" ) type schemaService struct{} func (*schemaService) CreateSchema(_ context.Context, req pprocutils.CreateSchemaRequest) (pprocutils.CreateSchemaResponse, error) { protoReq := toproto.CreateSchemaRequest(req) + var resp procutilsv1.CreateSchemaResponse - buffer := bufferPool.Get().([]byte) - defer bufferPool.Put(buffer) - - buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) - if err != nil { - return pprocutils.CreateSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) - } - - buffer, cmdSize, err := hostCall(_createSchema, buffer) + err := handleImportedCall(_createSchema, protoReq, &resp) if err != nil { return pprocutils.CreateSchemaResponse{}, fmt.Errorf("error calling createSchema: %w", err) } - var resp procutilsv1.CreateSchemaResponse - err = proto.Unmarshal(buffer[:cmdSize], &resp) - if err != nil { - return pprocutils.CreateSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } - return fromproto.CreateSchemaResponse(&resp), nil } func (*schemaService) GetSchema(_ context.Context, req pprocutils.GetSchemaRequest) (pprocutils.GetSchemaResponse, error) { protoReq := toproto.GetSchemaRequest(req) + var resp procutilsv1.GetSchemaResponse - buffer := bufferPool.Get().([]byte) - defer bufferPool.Put(buffer) - - buffer, err := proto.MarshalOptions{}.MarshalAppend(buffer[:0], protoReq) - if err != nil { - return pprocutils.GetSchemaResponse{}, fmt.Errorf("error marshalling request: %w", err) - } - - buffer, cmdSize, err := hostCall(_getSchema, buffer) + err := handleImportedCall(_getSchema, protoReq, &resp) if err != nil { return pprocutils.GetSchemaResponse{}, fmt.Errorf("error calling getSchema: %w", err) } - var resp procutilsv1.GetSchemaResponse - err = proto.Unmarshal(buffer[:cmdSize], &resp) - if err != nil { - return pprocutils.GetSchemaResponse{}, fmt.Errorf("failed unmarshalling %v bytes into proto type: %w", cmdSize, err) - } return fromproto.GetSchemaResponse(&resp), nil } diff --git a/wasm/util.go b/wasm/util.go index f90f22b..bb798c6 100644 --- a/wasm/util.go +++ b/wasm/util.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Meroxa, Inc. +// Copyright © 2025 Meroxa, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,36 +12,151 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build wasm - package wasm import ( - "os" + "fmt" + "unsafe" "github.com/conduitio/conduit-processor-sdk/pprocutils" - "github.com/conduitio/conduit-processor-sdk/schema" - "github.com/rs/zerolog" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "google.golang.org/protobuf/proto" ) -func InitUtils(logLevel string) { - initLogger(logLevel) - initSchemaService() +// -- buffer ------------------------------------------------------------------- + +// buffer is a utility struct that holds a byte slice and an unsafe pointer to +// the start of the slice's data. It is used to efficiently pass data between +// the host and the WebAssembly module without unnecessary copying. +type buffer []byte + +func newBuffer(size int) *buffer { + b := make(buffer, size) + return &b +} + +func (b *buffer) Grow(size int) { + if cap(*b) < size { + // This append logic preserves existing data when growing. + // We append to the end of the slice after expanding it to the full capacity. + *b = append((*b)[:cap(*b)], make([]byte, size-cap(*b))...) + } + // Reslice to the new size if we had enough capacity. + // This does not shrink the buffer if size < len(*b). + if len(*b) < size { + *b = (*b)[:size] + } } -func initLogger(logLevel string) { - logger := zerolog.New(os.Stdout) +// Pointer returns a pointer to the buffer's data. +// Includes a safety check for zero-length slices to prevent panics. +func (b *buffer) Pointer() unsafe.Pointer { + if len(*b) == 0 { + return nil + } + return unsafe.Pointer(&(*b)[0]) +} - level, err := zerolog.ParseLevel(logLevel) +// PointerAndSize returns the pointer and size in a single uint64. +// The higher 32 bits are the pointer, and the lower 32 bits are the size. +func (b *buffer) PointerAndSize() uint64 { + return (uint64(uintptr(b.Pointer())) << 32) | uint64(len(*b)) +} + +// -- Imported Function Utilities ---------------------------------------------- + +var importBuffer = newBuffer(1024) // 1kB buffer for requests and responses + +// hostFunc is the function type for the imported functions from the host. +// +// The arguments are: +// (1) a pointer to the address where the command should be written +// (2) the size of allocated memory. +// +// The return value indicates the size of the allocated response in bytes. If the +// response is larger than the allocated memory, the caller should reallocate the +// memory and call the function again. +type hostFunc func(ptr unsafe.Pointer, size uint32) uint32 + +// handleImportedCall calls the function from the host 2 times max, is the buffer +// size is not enough the first time its called, it will be resized the second call. +// Returns the buffer, command size, and error. +func handleImportedCall[REQ, RESP proto.Message](fn hostFunc, req REQ, resp RESP) error { + reqData, err := proto.MarshalOptions{}.MarshalAppend((*importBuffer)[:0], req) if err != nil { - logger.Warn().Err(err).Msg("failed to parse log level, falling back to debug") - // fallback to debug level - level = zerolog.DebugLevel + return fmt.Errorf("error marshalling proto type %T: %w", req, err) } - logger = logger.Level(level) - pprocutils.Logger = logger + *importBuffer = reqData + + // 2 tries, 1st try is with the current buffer size, if that's not enough, + // then resize the buffer and try again + for i := 0; i < 2; i++ { + // request the host to write the response to the given buffer address + cmdSize := fn(importBuffer.Pointer(), uint32(len(*importBuffer))) //nolint:gosec // no risk of overflow + switch { + case cmdSize >= pprocutils.ErrorCodeStart: + // error codes + return pprocutils.NewErrorFromCode(cmdSize) + case cmdSize > uint32(len(*importBuffer)): //nolint:gosec // no risk of overflow + // not enough memory, resize the buffer and try again + importBuffer.Grow(int(cmdSize)) + continue // try again + } + + // we have a valid response, unmarshal it + err = proto.Unmarshal((*importBuffer)[:cmdSize], resp) + if err != nil { + return fmt.Errorf("failed unmarshalling %v bytes into proto type %T: %w", cmdSize, resp, err) + } + + return nil + } + panic("if this is reached, then the buffer was not resized correctly and we are in an infinite loop") +} + +// -- Exported Function Utilities ---------------------------------------------- + +var ( + exportReqBuffer = newBuffer(1024) // 1kB buffer for export requests + exportRespBuffer = newBuffer(1024) // 1kB buffer for export responses +) + +// handleExportedCall handles the exported call from the host. +func handleExportedCall[REQ, RESP proto.Message]( + ptr unsafe.Pointer, size uint32, + handleFn func(REQ) (RESP, error), + cmdReq REQ, +) uint64 { + in := unsafe.Slice((*byte)(ptr), size) + + err := proto.Unmarshal(in[:size], cmdReq) + if err != nil { + return handleExportedCallError(fmt.Errorf("failed to unmarshal %v bytes into proto type %T: %w", size, cmdReq, err)) + } + + resp, err := handleFn(cmdReq) + if err != nil { + return handleExportedCallError(fmt.Errorf("failed to handle command: %w", err)) + } + + outData, err := proto.MarshalOptions{}.MarshalAppend((*exportRespBuffer)[:0], resp) + if err != nil { + return handleExportedCallError(fmt.Errorf("failed marshalling proto type %T into bytes: %w", resp, err)) + } + *exportRespBuffer = outData + return exportRespBuffer.PointerAndSize() } -func initSchemaService() { - schema.SchemaService = &schemaService{} +func handleExportedCallError(err error) uint64 { + protoErr := &processorv1.Error{ + Code: pprocutils.ErrorCodeInternal, + Message: fmt.Sprintf("error handling command: %v", err), + } + outData, err := proto.MarshalOptions{}.MarshalAppend((*exportRespBuffer)[:0], protoErr) + if err != nil { + // If we fail to marshal the error, we panic because we can't return an error to the host. + panic(fmt.Sprintf("failed to marshal error response: %v", err)) + } + *exportRespBuffer = outData + return exportRespBuffer.PointerAndSize() } diff --git a/wasm/util_test.go b/wasm/util_test.go new file mode 100644 index 0000000..501ee1c --- /dev/null +++ b/wasm/util_test.go @@ -0,0 +1,187 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wasm + +import ( + "testing" + "unsafe" + + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/matryer/is" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +func TestBuffer_NewBuffer(t *testing.T) { + t.Run("should create a buffer with a specific size", func(t *testing.T) { + is := is.New(t) + size := 128 + b := newBuffer(size) + + is.Equal(len(*b), size) + is.True(cap(*b) >= size) + }) + + t.Run("should create an empty buffer for size 0", func(t *testing.T) { + is := is.New(t) + b := newBuffer(0) + is.True(b != nil) // The buffer itself is not nil + is.Equal(len(*b), 0) // The underlying slice has zero length + is.Equal(cap(*b), 0) // and zero capacity + }) +} + +func TestBuffer_Grow(t *testing.T) { + t.Run("should grow by re-allocating and preserve data", func(t *testing.T) { + is := is.New(t) + b := newBuffer(10) + // Put some data in the buffer + copy(*b, "0123456789") + + originalCap := cap(*b) + originalPtr := b.Pointer() + + newSize := originalCap + 10 + b.Grow(newSize) + + is.Equal(len(*b), newSize) + is.True(cap(*b) >= newSize) + is.True(b.Pointer() != originalPtr) // Pointer should change after reallocation + is.Equal(string((*b)[:10]), "0123456789") // Old data must be preserved + }) + + t.Run("should grow by re-slicing when capacity is sufficient", func(t *testing.T) { + is := is.New(t) + // Create a buffer with more capacity than length + b := buffer(make([]byte, 5, 20)) + copy(b, "hello") + + originalCap := cap(b) + originalPtr := b.Pointer() + + newSize := 15 + b.Grow(newSize) + + is.Equal(len(b), newSize) + is.Equal(cap(b), originalCap) // Capacity should not change + is.Equal(b.Pointer(), originalPtr) // Pointer should not change + is.Equal(string(b[:5]), "hello") // Old data must be preserved + }) +} + +func TestBuffer_PointerAndSize(t *testing.T) { + t.Run("should correctly encode pointer and size", func(t *testing.T) { + is := is.New(t) + // Start with a non-zero buffer per assumptions + b := newBuffer(256) + + packed := b.PointerAndSize() + is.True(packed != 0) + + // Decode the values + decodedPtrVal := uint32(packed >> 32) + decodedSize := uint32(packed) + + // This test simulates a 32-bit wasm architecture. On a 64-bit test host, + // we must compare the decoded 32-bit pointer value with the truncated + // original 64-bit pointer value. + is.Equal(decodedPtrVal, uint32(uintptr(b.Pointer()))) + is.Equal(int(decodedSize), len(*b)) + is.Equal(int(decodedSize), 256) + }) + + t.Run("should return 0 for an empty buffer", func(t *testing.T) { + is := is.New(t) + // Even though the assumption is non-zero, the methods should be robust. + b := newBuffer(0) + packed := b.PointerAndSize() + is.Equal(packed, uint64(0)) // Pointer is nil (0) and size is 0 + }) +} + +func TestHostCall(t *testing.T) { + responseMsg := &anypb.Any{Value: []byte("response")} + responseBytes, err := proto.Marshal(responseMsg) + if err != nil { + panic(err) + } + responseLen := len(responseBytes) + + tests := []struct { + name string + hostFunc hostFunc + buffer *buffer + req *anypb.Any + wantResp *anypb.Any + wantError error + }{ + { + name: "buffer large enough on first call", + hostFunc: func(ptr unsafe.Pointer, size uint32) uint32 { + buf := unsafe.Slice((*byte)(ptr), size) + copy(buf, responseBytes) + return uint32(responseLen) + }, + buffer: newBuffer(responseLen), + req: &anypb.Any{Value: []byte("request")}, + wantResp: responseMsg, + wantError: nil, + }, + { + name: "buffer not large enough on first call, resized on second", + hostFunc: func(ptr unsafe.Pointer, size uint32) uint32 { + if size < uint32(responseLen) { + return uint32(responseLen) + } + buf := unsafe.Slice((*byte)(ptr), size) + copy(buf, responseBytes) + return uint32(responseLen) + }, + buffer: newBuffer(responseLen - 1), // initial buffer is too small + req: &anypb.Any{Value: []byte("request")}, + wantResp: responseMsg, + wantError: nil, + }, + { + name: "host returns error", + hostFunc: func(_ unsafe.Pointer, _ uint32) uint32 { + return pprocutils.ErrorCodeInternal + }, + req: &anypb.Any{Value: []byte("request")}, + wantResp: nil, + wantError: pprocutils.NewErrorFromCode(pprocutils.ErrorCodeInternal), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + is := is.New(t) + if tt.buffer != nil { + oldBuffer := importBuffer + importBuffer = tt.buffer + defer func() { + importBuffer = oldBuffer // Restore the original buffer after the test + }() + } + + var resp anypb.Any + err := handleImportedCall(tt.hostFunc, tt.req, &resp) + is.Equal(tt.wantError, err) + if err == nil { + is.Equal(tt.wantResp.Value, resp.Value) + } + }) + } +} From 4a698fe3b2313fbdf3e622a6cbbdd288ab4a6341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 26 Jun 2025 19:32:25 +0200 Subject: [PATCH 2/2] fix linter warnings --- .golangci.yml | 1 + wasm/exports.go | 6 ++-- wasm/util.go | 31 ++++++++--------- wasm/util_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 98 insertions(+), 26 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a6b0988..deb3b75 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -109,6 +109,7 @@ linters: - gocognit - gosec - maintidx + - unparam path: _test\.go - linters: - goconst diff --git a/wasm/exports.go b/wasm/exports.go index 8eacd0b..92b2f4b 100644 --- a/wasm/exports.go +++ b/wasm/exports.go @@ -22,11 +22,13 @@ import ( processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" ) +var mallocBuffer = newBuffer(1024) // 1kB buffer for malloc + //go:wasmexport conduit.processor.v1.malloc func malloc(size uint32) unsafe.Pointer { // Allocate a buffer of the specified size. - exportReqBuffer.Grow(int(size)) - return exportReqBuffer.Pointer() + mallocBuffer.Grow(int(size)) + return mallocBuffer.Pointer() } //go:wasmexport conduit.processor.v1.specification diff --git a/wasm/util.go b/wasm/util.go index bb798c6..d5957a7 100644 --- a/wasm/util.go +++ b/wasm/util.go @@ -65,7 +65,7 @@ func (b *buffer) PointerAndSize() uint64 { // -- Imported Function Utilities ---------------------------------------------- -var importBuffer = newBuffer(1024) // 1kB buffer for requests and responses +var importCallBuffer = newBuffer(1024) // 1kB buffer for requests and responses // hostFunc is the function type for the imported functions from the host. // @@ -82,29 +82,29 @@ type hostFunc func(ptr unsafe.Pointer, size uint32) uint32 // size is not enough the first time its called, it will be resized the second call. // Returns the buffer, command size, and error. func handleImportedCall[REQ, RESP proto.Message](fn hostFunc, req REQ, resp RESP) error { - reqData, err := proto.MarshalOptions{}.MarshalAppend((*importBuffer)[:0], req) + reqData, err := proto.MarshalOptions{}.MarshalAppend((*importCallBuffer)[:0], req) if err != nil { return fmt.Errorf("error marshalling proto type %T: %w", req, err) } - *importBuffer = reqData + *importCallBuffer = reqData // 2 tries, 1st try is with the current buffer size, if that's not enough, // then resize the buffer and try again for i := 0; i < 2; i++ { // request the host to write the response to the given buffer address - cmdSize := fn(importBuffer.Pointer(), uint32(len(*importBuffer))) //nolint:gosec // no risk of overflow + cmdSize := fn(importCallBuffer.Pointer(), uint32(len(*importCallBuffer))) //nolint:gosec // no risk of overflow switch { case cmdSize >= pprocutils.ErrorCodeStart: // error codes return pprocutils.NewErrorFromCode(cmdSize) - case cmdSize > uint32(len(*importBuffer)): //nolint:gosec // no risk of overflow + case cmdSize > uint32(len(*importCallBuffer)): //nolint:gosec // no risk of overflow // not enough memory, resize the buffer and try again - importBuffer.Grow(int(cmdSize)) + importCallBuffer.Grow(int(cmdSize)) continue // try again } // we have a valid response, unmarshal it - err = proto.Unmarshal((*importBuffer)[:cmdSize], resp) + err = proto.Unmarshal((*importCallBuffer)[:cmdSize], resp) if err != nil { return fmt.Errorf("failed unmarshalling %v bytes into proto type %T: %w", cmdSize, resp, err) } @@ -116,10 +116,7 @@ func handleImportedCall[REQ, RESP proto.Message](fn hostFunc, req REQ, resp RESP // -- Exported Function Utilities ---------------------------------------------- -var ( - exportReqBuffer = newBuffer(1024) // 1kB buffer for export requests - exportRespBuffer = newBuffer(1024) // 1kB buffer for export responses -) +var exportCallBuffer = newBuffer(1024) // 1kB buffer for export responses // handleExportedCall handles the exported call from the host. func handleExportedCall[REQ, RESP proto.Message]( @@ -139,12 +136,12 @@ func handleExportedCall[REQ, RESP proto.Message]( return handleExportedCallError(fmt.Errorf("failed to handle command: %w", err)) } - outData, err := proto.MarshalOptions{}.MarshalAppend((*exportRespBuffer)[:0], resp) + outData, err := proto.MarshalOptions{}.MarshalAppend((*exportCallBuffer)[:0], resp) if err != nil { return handleExportedCallError(fmt.Errorf("failed marshalling proto type %T into bytes: %w", resp, err)) } - *exportRespBuffer = outData - return exportRespBuffer.PointerAndSize() + *exportCallBuffer = outData + return exportCallBuffer.PointerAndSize() } func handleExportedCallError(err error) uint64 { @@ -152,11 +149,11 @@ func handleExportedCallError(err error) uint64 { Code: pprocutils.ErrorCodeInternal, Message: fmt.Sprintf("error handling command: %v", err), } - outData, err := proto.MarshalOptions{}.MarshalAppend((*exportRespBuffer)[:0], protoErr) + outData, err := proto.MarshalOptions{}.MarshalAppend((*exportCallBuffer)[:0], protoErr) if err != nil { // If we fail to marshal the error, we panic because we can't return an error to the host. panic(fmt.Sprintf("failed to marshal error response: %v", err)) } - *exportRespBuffer = outData - return exportRespBuffer.PointerAndSize() + *exportCallBuffer = outData + return exportCallBuffer.PointerAndSize() } diff --git a/wasm/util_test.go b/wasm/util_test.go index 501ee1c..36602bb 100644 --- a/wasm/util_test.go +++ b/wasm/util_test.go @@ -15,10 +15,12 @@ package wasm import ( + "fmt" "testing" "unsafe" "github.com/conduitio/conduit-processor-sdk/pprocutils" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "github.com/matryer/is" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -111,12 +113,11 @@ func TestBuffer_PointerAndSize(t *testing.T) { }) } -func TestHostCall(t *testing.T) { +func TestHandleImportedCall(t *testing.T) { + is := is.New(t) responseMsg := &anypb.Any{Value: []byte("response")} responseBytes, err := proto.Marshal(responseMsg) - if err != nil { - panic(err) - } + is.NoErr(err) responseLen := len(responseBytes) tests := []struct { @@ -169,10 +170,10 @@ func TestHostCall(t *testing.T) { t.Run(tt.name, func(t *testing.T) { is := is.New(t) if tt.buffer != nil { - oldBuffer := importBuffer - importBuffer = tt.buffer + oldBuffer := importCallBuffer + importCallBuffer = tt.buffer defer func() { - importBuffer = oldBuffer // Restore the original buffer after the test + importCallBuffer = oldBuffer // Restore the original buffer after the test }() } @@ -185,3 +186,74 @@ func TestHostCall(t *testing.T) { }) } } + +func TestHandleExportedCall(t *testing.T) { + is := is.New(t) + + // Prepare a request and response + reqMsg := &anypb.Any{Value: []byte("request")} + respMsg := &anypb.Any{Value: []byte("response")} + reqBytes, err := proto.Marshal(reqMsg) + is.NoErr(err) + + // Handler that returns a valid response + handler := func(r *anypb.Any) (*anypb.Any, error) { + is.Equal(r.Value, reqMsg.Value) + return respMsg, nil + } + + // Handler that returns an error + errorHandler := func(*anypb.Any) (*anypb.Any, error) { + return nil, fmt.Errorf("handler error") + } + + t.Run("success", func(t *testing.T) { + is := is.New(t) + // Prepare buffer for request + buf := newBuffer(len(reqBytes)) + copy(*buf, reqBytes) + + // Call handleExportedCall + result := handleExportedCall(buf.Pointer(), uint32(len(*buf)), handler, &anypb.Any{}) + // Unpack pointer and size + respPtr := uint32(result >> 32) + respSize := uint32(result) + if respPtr == 0 || respSize == 0 { + t.Fatalf("expected non-zero pointer and size") + } + // Read response from exportCallBuffer + got := &anypb.Any{} + err := proto.Unmarshal((*exportCallBuffer)[:respSize], got) + is.NoErr(err) + is.Equal(got.Value, respMsg.Value) + }) + + t.Run("handler error", func(t *testing.T) { + is := is.New(t) + buf := newBuffer(len(reqBytes)) + copy(*buf, reqBytes) + + result := handleExportedCall(buf.Pointer(), uint32(len(*buf)), errorHandler, &anypb.Any{}) + respSize := uint32(result) + got := &processorv1.Error{} + err := proto.Unmarshal((*exportCallBuffer)[:respSize], got) + is.NoErr(err) + is.Equal(int(got.Code), pprocutils.ErrorCodeInternal) + is.True(len(got.Message) > 0) + }) + + t.Run("unmarshal error", func(t *testing.T) { + is := is.New(t) + // Pass invalid proto bytes + buf := newBuffer(len("not a proto")) + copy(*buf, "not a proto") + + result := handleExportedCall(buf.Pointer(), uint32(len(*buf)), handler, &anypb.Any{}) + respSize := uint32(result) + got := &processorv1.Error{} + err := proto.Unmarshal((*exportCallBuffer)[:respSize], got) + is.NoErr(err) + is.Equal(int(got.Code), pprocutils.ErrorCodeInternal) + is.True(len(got.Message) > 0) + }) +}