Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions core/contractsapi/stream_procedures.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,6 @@ func (s *Action) ExecuteProcedure(ctx context.Context, procedure string, args []
return s.execute(ctx, procedure, args)
}

type GetRecordRawOutput struct {
EventTime string `json:"event_time"`
Value string `json:"value"`
}

// transformOrNil returns nil if the value is nil, otherwise it applies the transform function to the value.
func transformOrNil[T any](value *T, transform func(T) any) any {
if value == nil {
return nil
}
return transform(*value)
}

// CallProcedure is a wrapper around the call function, just to be explicit that users can call arbitrary procedures
func (s *Action) CallProcedure(ctx context.Context, procedure string, args []any) (*kwiltypes.QueryResult, error) {
return s.call(ctx, procedure, args)
Expand All @@ -143,9 +130,9 @@ func (s *Action) GetRecord(ctx context.Context, input types.GetRecordInput) (typ
var args []any
args = append(args, input.DataProvider)
args = append(args, input.StreamId)
args = append(args, transformOrNil(input.From, func(date int) any { return date }))
args = append(args, transformOrNil(input.To, func(date int) any { return date }))
args = append(args, transformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.From, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.To, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.FrozenAt, func(date int) any { return date }))
if input.UseCache != nil {
args = append(args, *input.UseCache)
}
Expand All @@ -160,7 +147,7 @@ func (s *Action) GetRecord(ctx context.Context, input types.GetRecordInput) (typ
return types.ActionResult{}, errors.WithStack(err)
}

rawOutputs, err := DecodeCallResult[GetRecordRawOutput](callResult.QueryResult)
rawOutputs, err := DecodeCallResult[types.GetRecordRawOutput](callResult.QueryResult)
if err != nil {
return types.ActionResult{}, errors.WithStack(err)
}
Expand Down Expand Up @@ -223,16 +210,16 @@ func (s *Action) GetRecord(ctx context.Context, input types.GetRecordInput) (typ
}, nil
}

type GetIndexRawOutput = GetRecordRawOutput
type GetIndexRawOutput = types.GetRecordRawOutput

func (s *Action) GetIndex(ctx context.Context, input types.GetIndexInput) (types.ActionResult, error) {
var args []any
args = append(args, input.DataProvider)
args = append(args, input.StreamId)
args = append(args, transformOrNil(input.From, func(date int) any { return date }))
args = append(args, transformOrNil(input.To, func(date int) any { return date }))
args = append(args, transformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, transformOrNil(input.BaseDate, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.From, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.To, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.BaseDate, func(date int) any { return date }))
if input.UseCache != nil {
args = append(args, *input.UseCache)
}
Expand Down Expand Up @@ -311,16 +298,16 @@ func (s *Action) GetIndex(ctx context.Context, input types.GetIndexInput) (types
}, nil
}

type GetIndexChangeRawOutput = GetRecordRawOutput
type GetIndexChangeRawOutput = types.GetRecordRawOutput

func (s *Action) GetIndexChange(ctx context.Context, input types.GetIndexChangeInput) (types.ActionResult, error) {
var args []any
args = append(args, input.DataProvider)
args = append(args, input.StreamId)
args = append(args, transformOrNil(input.From, func(date int) any { return date }))
args = append(args, transformOrNil(input.To, func(date int) any { return date }))
args = append(args, transformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, transformOrNil(input.BaseDate, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.From, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.To, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.BaseDate, func(date int) any { return date }))
args = append(args, input.TimeInterval)
if input.UseCache != nil {
args = append(args, *input.UseCache)
Expand Down Expand Up @@ -531,7 +518,7 @@ func (s *Action) GetFirstRecord(ctx context.Context, input types.GetFirstRecordI
return types.ActionResult{}, errors.WithStack(err)
}

rawOutputs, err := DecodeCallResult[GetRecordRawOutput](callResult.QueryResult)
rawOutputs, err := DecodeCallResult[types.GetRecordRawOutput](callResult.QueryResult)
if err != nil {
return types.ActionResult{}, errors.WithStack(err)
}
Expand Down
224 changes: 224 additions & 0 deletions core/tnclient/actions_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package tnclient

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/apd/v3"
"github.com/pkg/errors"
kwilClientType "github.com/trufnetwork/kwil-db/core/client/types"
kwilType "github.com/trufnetwork/kwil-db/core/types"
"github.com/trufnetwork/kwil-db/node/types"
tn_api "github.com/trufnetwork/sdk-go/core/contractsapi"
clientType "github.com/trufnetwork/sdk-go/core/types"
"github.com/trufnetwork/sdk-go/core/util"
)

// TransportAction implements IAction interface using the Transport abstraction.
// This allows actions to work with any transport (HTTP, CRE, etc.).
//
// MINIMAL IMPLEMENTATION: Only GetRecord is fully implemented.
// Other methods return "not implemented" errors since they're not needed by QuantAMM.
type TransportAction struct {
transport Transport
}

var _ clientType.IAction = (*TransportAction)(nil)

// GetRecord reads the records of the stream within the given date range.
// This is a core method needed by QuantAMM for reading stream data.
func (a *TransportAction) GetRecord(ctx context.Context, input clientType.GetRecordInput) (clientType.ActionResult, error) {
var args []any
args = append(args, input.DataProvider)
args = append(args, input.StreamId)
args = append(args, util.TransformOrNil(input.From, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.To, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.FrozenAt, func(date int) any { return date }))
args = append(args, util.TransformOrNil(input.BaseDate, func(date int) any { return date }))
if input.UseCache != nil {
args = append(args, *input.UseCache)
}

prefix := ""
if input.Prefix != nil {
prefix = *input.Prefix
}

result, err := a.transport.Call(ctx, "", prefix+"get_record", args)
if err != nil {
return clientType.ActionResult{}, errors.WithStack(err)
}

// Decode raw SQL output using shared type
rawOutputs, err := tn_api.DecodeCallResult[clientType.GetRecordRawOutput](result.QueryResult)
if err != nil {
return clientType.ActionResult{}, errors.WithStack(err)
}

// Parse strings to proper types
var outputs []clientType.StreamResult
for _, rawOutput := range rawOutputs {
value, _, err := apd.NewFromString(rawOutput.Value)
if err != nil {
return clientType.ActionResult{}, errors.WithStack(err)
}

eventTime := 0
if rawOutput.EventTime != "" {
eventTime, err = strconv.Atoi(rawOutput.EventTime)
if err != nil {
return clientType.ActionResult{}, errors.WithStack(err)
}
}

outputs = append(outputs, clientType.StreamResult{
EventTime: eventTime,
Value: *value,
})
}

// Note: Cache metadata parsing is not implemented in this minimal version
// as noted in TRANSPORT_IMPLEMENTATION_NOTES.md
return clientType.ActionResult{Results: outputs}, nil
}

// Stub implementations for IAction methods not needed by QuantAMM.
// These return errors indicating they're not implemented for custom transports.

func (a *TransportAction) ExecuteProcedure(ctx context.Context, procedure string, args [][]any) (types.Hash, error) {
return a.transport.Execute(ctx, "", procedure, args)
}

func (a *TransportAction) CallProcedure(ctx context.Context, procedure string, args []any) (*kwilType.QueryResult, error) {
result, err := a.transport.Call(ctx, "", procedure, args)
if err != nil {
return nil, err
}
return result.QueryResult, nil
}

func (a *TransportAction) GetIndex(ctx context.Context, input clientType.GetIndexInput) (clientType.ActionResult, error) {
return clientType.ActionResult{}, fmt.Errorf("GetIndex not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetIndexChange(ctx context.Context, input clientType.GetIndexChangeInput) (clientType.ActionResult, error) {
return clientType.ActionResult{}, fmt.Errorf("GetIndexChange not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetType(ctx context.Context, locator clientType.StreamLocator) (clientType.StreamType, error) {
return "", fmt.Errorf("GetType not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetFirstRecord(ctx context.Context, input clientType.GetFirstRecordInput) (clientType.ActionResult, error) {
return clientType.ActionResult{}, fmt.Errorf("GetFirstRecord not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) SetReadVisibility(ctx context.Context, input clientType.VisibilityInput) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("SetReadVisibility not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetReadVisibility(ctx context.Context, locator clientType.StreamLocator) (*util.VisibilityEnum, error) {
return nil, fmt.Errorf("GetReadVisibility not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) SetComposeVisibility(ctx context.Context, input clientType.VisibilityInput) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("SetComposeVisibility not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetComposeVisibility(ctx context.Context, locator clientType.StreamLocator) (*util.VisibilityEnum, error) {
return nil, fmt.Errorf("GetComposeVisibility not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) AllowReadWallet(ctx context.Context, input clientType.ReadWalletInput) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("AllowReadWallet not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) DisableReadWallet(ctx context.Context, input clientType.ReadWalletInput) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("DisableReadWallet not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) AllowComposeStream(ctx context.Context, locator clientType.StreamLocator) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("AllowComposeStream not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) DisableComposeStream(ctx context.Context, locator clientType.StreamLocator) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("DisableComposeStream not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetStreamOwner(ctx context.Context, locator clientType.StreamLocator) ([]byte, error) {
return nil, fmt.Errorf("GetStreamOwner not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetAllowedReadWallets(ctx context.Context, locator clientType.StreamLocator) ([]util.EthereumAddress, error) {
return nil, fmt.Errorf("GetAllowedReadWallets not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) GetAllowedComposeStreams(ctx context.Context, locator clientType.StreamLocator) ([]clientType.StreamLocator, error) {
return nil, fmt.Errorf("GetAllowedComposeStreams not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) SetDefaultBaseTime(ctx context.Context, input clientType.DefaultBaseTimeInput) (types.Hash, error) {
return types.Hash{}, fmt.Errorf("SetDefaultBaseTime not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) BatchStreamExists(ctx context.Context, streams []clientType.StreamLocator) ([]clientType.StreamExistsResult, error) {
return nil, fmt.Errorf("BatchStreamExists not implemented for custom transports - use HTTP transport or implement if needed")
}

func (a *TransportAction) BatchFilterStreamsByExistence(ctx context.Context, streams []clientType.StreamLocator, returnExisting bool) ([]clientType.StreamLocator, error) {
return nil, fmt.Errorf("BatchFilterStreamsByExistence not implemented for custom transports - use HTTP transport or implement if needed")
}

// TransportPrimitiveAction implements IPrimitiveAction interface using the Transport abstraction.
// This allows primitive stream actions to work with any transport (HTTP, CRE, etc.).
//
// MINIMAL IMPLEMENTATION: Only InsertRecords is fully implemented.
// Other methods return "not implemented" errors since they're not needed by QuantAMM.
type TransportPrimitiveAction struct {
TransportAction
}

var _ clientType.IPrimitiveAction = (*TransportPrimitiveAction)(nil)

// InsertRecords inserts multiple records into primitive streams.
// This is a core method needed by QuantAMM for writing stream data.
func (p *TransportPrimitiveAction) InsertRecords(ctx context.Context, inputs []clientType.InsertRecordInput, opts ...kwilClientType.TxOpt) (types.Hash, error) {
var (
dataProviders []string
streamIds []string
eventTimes []int
values kwilType.DecimalArray
)

for _, input := range inputs {
// Convert float64 to decimal with 36 precision and 18 scale
valueNumeric, err := kwilType.ParseDecimalExplicit(strconv.FormatFloat(input.Value, 'f', -1, 64), 36, 18)
if err != nil {
return types.Hash{}, errors.WithStack(err)
}

dataProviders = append(dataProviders, input.DataProvider)
streamIds = append(streamIds, input.StreamId)
eventTimes = append(eventTimes, input.EventTime)
values = append(values, valueNumeric)
}

return p.transport.Execute(ctx, "", "insert_records", [][]any{{
dataProviders,
streamIds,
eventTimes,
values,
}}, opts...)
}

// InsertRecord inserts a single record - stub implementation
func (p *TransportPrimitiveAction) InsertRecord(ctx context.Context, input clientType.InsertRecordInput, opts ...kwilClientType.TxOpt) (types.Hash, error) {
// Just delegate to InsertRecords
return p.InsertRecords(ctx, []clientType.InsertRecordInput{input}, opts...)
}

// CheckValidPrimitiveStream checks if the stream is a valid primitive stream - stub implementation
func (p *TransportPrimitiveAction) CheckValidPrimitiveStream(ctx context.Context, locator clientType.StreamLocator) error {
return fmt.Errorf("CheckValidPrimitiveStream not implemented for custom transports - use HTTP transport or implement if needed")
}
45 changes: 34 additions & 11 deletions core/tnclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,20 @@ func (c *Client) GetKwilClient() *gatewayclient.GatewayClient {
}

func (c *Client) DeployStream(ctx context.Context, streamId util.StreamId, streamType clientType.StreamType) (types.Hash, error) {
return tn_api.DeployStream(ctx, tn_api.DeployStreamInput{
StreamId: streamId,
StreamType: streamType,
KwilClient: c.GetKwilClient(),
})
// For HTTP transport, use the existing implementation (backwards compatible)
// For custom transports (CRE, etc.), use transport.Execute directly
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
return tn_api.DeployStream(ctx, tn_api.DeployStreamInput{
StreamId: streamId,
StreamType: streamType,
KwilClient: httpTransport.gatewayClient,
})
}
// Use transport.Execute directly for custom transports
return c.transport.Execute(ctx, "", "create_stream", [][]any{{
streamId.String(),
streamType.String(),
}})
}

func (c *Client) DestroyStream(ctx context.Context, streamId util.StreamId) (types.Hash, error) {
Expand All @@ -141,15 +150,29 @@ func (c *Client) DestroyStream(ctx context.Context, streamId util.StreamId) (typ
}

func (c *Client) LoadActions() (clientType.IAction, error) {
return tn_api.LoadAction(tn_api.NewActionOptions{
Client: c.GetKwilClient(),
})
// For HTTP transport, use the full-featured GatewayClient implementation
// For custom transports (CRE, etc.), use the minimal transport-aware implementation
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
return tn_api.LoadAction(tn_api.NewActionOptions{
Client: httpTransport.gatewayClient,
})
}
// Return transport-aware implementation for custom transports
return &TransportAction{transport: c.transport}, nil
}

func (c *Client) LoadPrimitiveActions() (clientType.IPrimitiveAction, error) {
return tn_api.LoadPrimitiveActions(tn_api.NewActionOptions{
Client: c.GetKwilClient(),
})
// For HTTP transport, use the full-featured GatewayClient implementation
// For custom transports (CRE, etc.), use the minimal transport-aware implementation
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
return tn_api.LoadPrimitiveActions(tn_api.NewActionOptions{
Client: httpTransport.gatewayClient,
})
}
// Return transport-aware implementation for custom transports
return &TransportPrimitiveAction{
TransportAction: TransportAction{transport: c.transport},
}, nil
}

func (c *Client) LoadComposedActions() (clientType.IComposedAction, error) {
Expand Down
12 changes: 12 additions & 0 deletions core/types/result_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

// GetRecordRawOutput represents the raw SQL output from get_record, get_index,
// get_index_change, and get_first_record procedures.
//
// This struct is shared across multiple packages to avoid duplication:
// - core/contractsapi (HTTP-based implementations)
// - core/tnclient (transport-aware implementations)
type GetRecordRawOutput struct {
EventTime string `json:"event_time"`
Value string `json:"value"`
}
Loading