Skip to content

Commit 3bac41b

Browse files
committed
chore: enable SDK functionality for primitive stream
1 parent 61605c5 commit 3bac41b

File tree

3 files changed

+332
-11
lines changed

3 files changed

+332
-11
lines changed

core/tnclient/actions_transport.go

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package tnclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
8+
"github.com/cockroachdb/apd/v3"
9+
"github.com/pkg/errors"
10+
kwilClientType "github.com/trufnetwork/kwil-db/core/client/types"
11+
kwilType "github.com/trufnetwork/kwil-db/core/types"
12+
"github.com/trufnetwork/kwil-db/node/types"
13+
tn_api "github.com/trufnetwork/sdk-go/core/contractsapi"
14+
clientType "github.com/trufnetwork/sdk-go/core/types"
15+
"github.com/trufnetwork/sdk-go/core/util"
16+
)
17+
18+
// TransportAction implements IAction interface using the Transport abstraction.
19+
// This allows actions to work with any transport (HTTP, CRE, etc.).
20+
//
21+
// MINIMAL IMPLEMENTATION: Only GetRecord is fully implemented.
22+
// Other methods return "not implemented" errors since they're not needed by QuantAMM.
23+
type TransportAction struct {
24+
transport Transport
25+
}
26+
27+
var _ clientType.IAction = (*TransportAction)(nil)
28+
29+
// GetRecord reads the records of the stream within the given date range.
30+
// This is a core method needed by QuantAMM for reading stream data.
31+
func (a *TransportAction) GetRecord(ctx context.Context, input clientType.GetRecordInput) (clientType.ActionResult, error) {
32+
var args []any
33+
args = append(args, input.DataProvider)
34+
args = append(args, input.StreamId)
35+
args = append(args, transformOrNil(input.From, func(date int) any { return date }))
36+
args = append(args, transformOrNil(input.To, func(date int) any { return date }))
37+
args = append(args, transformOrNil(input.FrozenAt, func(date int) any { return date }))
38+
if input.UseCache != nil {
39+
args = append(args, *input.UseCache)
40+
}
41+
42+
prefix := ""
43+
if input.Prefix != nil {
44+
prefix = *input.Prefix
45+
}
46+
47+
result, err := a.transport.Call(ctx, "", prefix+"get_record", args)
48+
if err != nil {
49+
return clientType.ActionResult{}, errors.WithStack(err)
50+
}
51+
52+
// Decode raw SQL output
53+
type getRecordRawOutput struct {
54+
EventTime string `json:"event_time"`
55+
Value string `json:"value"`
56+
}
57+
58+
rawOutputs, err := tn_api.DecodeCallResult[getRecordRawOutput](result.QueryResult)
59+
if err != nil {
60+
return clientType.ActionResult{}, errors.WithStack(err)
61+
}
62+
63+
// Parse strings to proper types
64+
var outputs []clientType.StreamResult
65+
for _, rawOutput := range rawOutputs {
66+
value, _, err := apd.NewFromString(rawOutput.Value)
67+
if err != nil {
68+
return clientType.ActionResult{}, errors.WithStack(err)
69+
}
70+
71+
eventTime := 0
72+
if rawOutput.EventTime != "" {
73+
eventTime, err = strconv.Atoi(rawOutput.EventTime)
74+
if err != nil {
75+
return clientType.ActionResult{}, errors.WithStack(err)
76+
}
77+
}
78+
79+
outputs = append(outputs, clientType.StreamResult{
80+
EventTime: eventTime,
81+
Value: *value,
82+
})
83+
}
84+
85+
// Note: Cache metadata parsing is not implemented in this minimal version
86+
// as noted in TRANSPORT_IMPLEMENTATION_NOTES.md
87+
return clientType.ActionResult{Results: outputs}, nil
88+
}
89+
90+
// transformOrNil returns nil if the value is nil, otherwise applies the transform function.
91+
func transformOrNil[T any](value *T, transform func(T) any) any {
92+
if value == nil {
93+
return nil
94+
}
95+
return transform(*value)
96+
}
97+
98+
// Stub implementations for IAction methods not needed by QuantAMM.
99+
// These return errors indicating they're not implemented for custom transports.
100+
101+
func (a *TransportAction) ExecuteProcedure(ctx context.Context, procedure string, args [][]any) (types.Hash, error) {
102+
return a.transport.Execute(ctx, "", procedure, args)
103+
}
104+
105+
func (a *TransportAction) CallProcedure(ctx context.Context, procedure string, args []any) (*kwilType.QueryResult, error) {
106+
result, err := a.transport.Call(ctx, "", procedure, args)
107+
if err != nil {
108+
return nil, err
109+
}
110+
return result.QueryResult, nil
111+
}
112+
113+
func (a *TransportAction) GetIndex(ctx context.Context, input clientType.GetIndexInput) (clientType.ActionResult, error) {
114+
return clientType.ActionResult{}, fmt.Errorf("GetIndex not implemented for custom transports - use HTTP transport or implement if needed")
115+
}
116+
117+
func (a *TransportAction) GetIndexChange(ctx context.Context, input clientType.GetIndexChangeInput) (clientType.ActionResult, error) {
118+
return clientType.ActionResult{}, fmt.Errorf("GetIndexChange not implemented for custom transports - use HTTP transport or implement if needed")
119+
}
120+
121+
func (a *TransportAction) GetType(ctx context.Context, locator clientType.StreamLocator) (clientType.StreamType, error) {
122+
return "", fmt.Errorf("GetType not implemented for custom transports - use HTTP transport or implement if needed")
123+
}
124+
125+
func (a *TransportAction) GetFirstRecord(ctx context.Context, input clientType.GetFirstRecordInput) (clientType.ActionResult, error) {
126+
return clientType.ActionResult{}, fmt.Errorf("GetFirstRecord not implemented for custom transports - use HTTP transport or implement if needed")
127+
}
128+
129+
func (a *TransportAction) SetReadVisibility(ctx context.Context, input clientType.VisibilityInput) (types.Hash, error) {
130+
return types.Hash{}, fmt.Errorf("SetReadVisibility not implemented for custom transports - use HTTP transport or implement if needed")
131+
}
132+
133+
func (a *TransportAction) GetReadVisibility(ctx context.Context, locator clientType.StreamLocator) (*util.VisibilityEnum, error) {
134+
return nil, fmt.Errorf("GetReadVisibility not implemented for custom transports - use HTTP transport or implement if needed")
135+
}
136+
137+
func (a *TransportAction) SetComposeVisibility(ctx context.Context, input clientType.VisibilityInput) (types.Hash, error) {
138+
return types.Hash{}, fmt.Errorf("SetComposeVisibility not implemented for custom transports - use HTTP transport or implement if needed")
139+
}
140+
141+
func (a *TransportAction) GetComposeVisibility(ctx context.Context, locator clientType.StreamLocator) (*util.VisibilityEnum, error) {
142+
return nil, fmt.Errorf("GetComposeVisibility not implemented for custom transports - use HTTP transport or implement if needed")
143+
}
144+
145+
func (a *TransportAction) AllowReadWallet(ctx context.Context, input clientType.ReadWalletInput) (types.Hash, error) {
146+
return types.Hash{}, fmt.Errorf("AllowReadWallet not implemented for custom transports - use HTTP transport or implement if needed")
147+
}
148+
149+
func (a *TransportAction) DisableReadWallet(ctx context.Context, input clientType.ReadWalletInput) (types.Hash, error) {
150+
return types.Hash{}, fmt.Errorf("DisableReadWallet not implemented for custom transports - use HTTP transport or implement if needed")
151+
}
152+
153+
func (a *TransportAction) AllowComposeStream(ctx context.Context, locator clientType.StreamLocator) (types.Hash, error) {
154+
return types.Hash{}, fmt.Errorf("AllowComposeStream not implemented for custom transports - use HTTP transport or implement if needed")
155+
}
156+
157+
func (a *TransportAction) DisableComposeStream(ctx context.Context, locator clientType.StreamLocator) (types.Hash, error) {
158+
return types.Hash{}, fmt.Errorf("DisableComposeStream not implemented for custom transports - use HTTP transport or implement if needed")
159+
}
160+
161+
func (a *TransportAction) GetStreamOwner(ctx context.Context, locator clientType.StreamLocator) ([]byte, error) {
162+
return nil, fmt.Errorf("GetStreamOwner not implemented for custom transports - use HTTP transport or implement if needed")
163+
}
164+
165+
func (a *TransportAction) GetAllowedReadWallets(ctx context.Context, locator clientType.StreamLocator) ([]util.EthereumAddress, error) {
166+
return nil, fmt.Errorf("GetAllowedReadWallets not implemented for custom transports - use HTTP transport or implement if needed")
167+
}
168+
169+
func (a *TransportAction) GetAllowedComposeStreams(ctx context.Context, locator clientType.StreamLocator) ([]clientType.StreamLocator, error) {
170+
return nil, fmt.Errorf("GetAllowedComposeStreams not implemented for custom transports - use HTTP transport or implement if needed")
171+
}
172+
173+
func (a *TransportAction) SetDefaultBaseTime(ctx context.Context, input clientType.DefaultBaseTimeInput) (types.Hash, error) {
174+
return types.Hash{}, fmt.Errorf("SetDefaultBaseTime not implemented for custom transports - use HTTP transport or implement if needed")
175+
}
176+
177+
func (a *TransportAction) BatchStreamExists(ctx context.Context, streams []clientType.StreamLocator) ([]clientType.StreamExistsResult, error) {
178+
return nil, fmt.Errorf("BatchStreamExists not implemented for custom transports - use HTTP transport or implement if needed")
179+
}
180+
181+
func (a *TransportAction) BatchFilterStreamsByExistence(ctx context.Context, streams []clientType.StreamLocator, returnExisting bool) ([]clientType.StreamLocator, error) {
182+
return nil, fmt.Errorf("BatchFilterStreamsByExistence not implemented for custom transports - use HTTP transport or implement if needed")
183+
}
184+
185+
// TransportPrimitiveAction implements IPrimitiveAction interface using the Transport abstraction.
186+
// This allows primitive stream actions to work with any transport (HTTP, CRE, etc.).
187+
//
188+
// MINIMAL IMPLEMENTATION: Only InsertRecords is fully implemented.
189+
// Other methods return "not implemented" errors since they're not needed by QuantAMM.
190+
type TransportPrimitiveAction struct {
191+
TransportAction
192+
}
193+
194+
var _ clientType.IPrimitiveAction = (*TransportPrimitiveAction)(nil)
195+
196+
// InsertRecords inserts multiple records into primitive streams.
197+
// This is a core method needed by QuantAMM for writing stream data.
198+
func (p *TransportPrimitiveAction) InsertRecords(ctx context.Context, inputs []clientType.InsertRecordInput, opts ...kwilClientType.TxOpt) (types.Hash, error) {
199+
var (
200+
dataProviders []string
201+
streamIds []string
202+
eventTimes []int
203+
values kwilType.DecimalArray
204+
)
205+
206+
for _, input := range inputs {
207+
// Convert float64 to decimal with 36 precision and 18 scale
208+
valueNumeric, err := kwilType.ParseDecimalExplicit(strconv.FormatFloat(input.Value, 'f', -1, 64), 36, 18)
209+
if err != nil {
210+
return types.Hash{}, errors.WithStack(err)
211+
}
212+
213+
dataProviders = append(dataProviders, input.DataProvider)
214+
streamIds = append(streamIds, input.StreamId)
215+
eventTimes = append(eventTimes, input.EventTime)
216+
values = append(values, valueNumeric)
217+
}
218+
219+
return p.transport.Execute(ctx, "", "insert_records", [][]any{{
220+
dataProviders,
221+
streamIds,
222+
eventTimes,
223+
values,
224+
}}, opts...)
225+
}
226+
227+
// InsertRecord inserts a single record - stub implementation
228+
func (p *TransportPrimitiveAction) InsertRecord(ctx context.Context, input clientType.InsertRecordInput, opts ...kwilClientType.TxOpt) (types.Hash, error) {
229+
// Just delegate to InsertRecords
230+
return p.InsertRecords(ctx, []clientType.InsertRecordInput{input}, opts...)
231+
}
232+
233+
// CheckValidPrimitiveStream checks if the stream is a valid primitive stream - stub implementation
234+
func (p *TransportPrimitiveAction) CheckValidPrimitiveStream(ctx context.Context, locator clientType.StreamLocator) error {
235+
return fmt.Errorf("CheckValidPrimitiveStream not implemented for custom transports - use HTTP transport or implement if needed")
236+
}

core/tnclient/client.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,20 @@ func (c *Client) GetKwilClient() *gatewayclient.GatewayClient {
126126
}
127127

128128
func (c *Client) DeployStream(ctx context.Context, streamId util.StreamId, streamType clientType.StreamType) (types.Hash, error) {
129-
return tn_api.DeployStream(ctx, tn_api.DeployStreamInput{
130-
StreamId: streamId,
131-
StreamType: streamType,
132-
KwilClient: c.GetKwilClient(),
133-
})
129+
// For HTTP transport, use the existing implementation (backwards compatible)
130+
// For custom transports (CRE, etc.), use transport.Execute directly
131+
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
132+
return tn_api.DeployStream(ctx, tn_api.DeployStreamInput{
133+
StreamId: streamId,
134+
StreamType: streamType,
135+
KwilClient: httpTransport.gatewayClient,
136+
})
137+
}
138+
// Use transport.Execute directly for custom transports
139+
return c.transport.Execute(ctx, "", "create_stream", [][]any{{
140+
streamId.String(),
141+
streamType.String(),
142+
}})
134143
}
135144

136145
func (c *Client) DestroyStream(ctx context.Context, streamId util.StreamId) (types.Hash, error) {
@@ -141,15 +150,29 @@ func (c *Client) DestroyStream(ctx context.Context, streamId util.StreamId) (typ
141150
}
142151

143152
func (c *Client) LoadActions() (clientType.IAction, error) {
144-
return tn_api.LoadAction(tn_api.NewActionOptions{
145-
Client: c.GetKwilClient(),
146-
})
153+
// For HTTP transport, use the full-featured GatewayClient implementation
154+
// For custom transports (CRE, etc.), use the minimal transport-aware implementation
155+
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
156+
return tn_api.LoadAction(tn_api.NewActionOptions{
157+
Client: httpTransport.gatewayClient,
158+
})
159+
}
160+
// Return transport-aware implementation for custom transports
161+
return &TransportAction{transport: c.transport}, nil
147162
}
148163

149164
func (c *Client) LoadPrimitiveActions() (clientType.IPrimitiveAction, error) {
150-
return tn_api.LoadPrimitiveActions(tn_api.NewActionOptions{
151-
Client: c.GetKwilClient(),
152-
})
165+
// For HTTP transport, use the full-featured GatewayClient implementation
166+
// For custom transports (CRE, etc.), use the minimal transport-aware implementation
167+
if httpTransport, ok := c.transport.(*HTTPTransport); ok {
168+
return tn_api.LoadPrimitiveActions(tn_api.NewActionOptions{
169+
Client: httpTransport.gatewayClient,
170+
})
171+
}
172+
// Return transport-aware implementation for custom transports
173+
return &TransportPrimitiveAction{
174+
TransportAction: TransportAction{transport: c.transport},
175+
}, nil
153176
}
154177

155178
func (c *Client) LoadComposedActions() (clientType.IComposedAction, error) {

core/types/result_types.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package types
2+
3+
// Common result types used for decoding SQL procedure outputs.
4+
// These types decode the output from various SQL procedures in the TRUF Network.
5+
6+
// GetMetadataResult decodes the output of the get_metadata procedure.
7+
// All columns are returned as strings (including NULLs which become empty strings).
8+
type GetMetadataResult struct {
9+
RowId string `json:"row_id"`
10+
ValueI string `json:"value_i"`
11+
ValueF string `json:"value_f"`
12+
ValueB string `json:"value_b"`
13+
ValueS string `json:"value_s"`
14+
ValueRef string `json:"value_ref"`
15+
CreatedAt string `json:"created_at"`
16+
}
17+
18+
// GetRecordRawOutput decodes the output of get_record, get_index, get_index_change,
19+
// and get_first_record procedures.
20+
type GetRecordRawOutput struct {
21+
EventTime string `json:"event_time"`
22+
Value string `json:"value"`
23+
}
24+
25+
// StreamExistsRaw decodes the raw output of the stream_exists_batch procedure.
26+
type StreamExistsRaw struct {
27+
DataProvider string `json:"data_provider"`
28+
StreamId string `json:"stream_id"`
29+
Exists bool `json:"stream_exists"`
30+
}
31+
32+
// FilteredStreamRaw decodes the raw output of the filter_streams_by_existence procedure.
33+
type FilteredStreamRaw struct {
34+
DataProvider string `json:"data_provider"`
35+
StreamId string `json:"stream_id"`
36+
}
37+
38+
// TaxonomyResult decodes the output of the describe_taxonomies procedure.
39+
// Includes both parent and child stream information.
40+
type TaxonomyResult struct {
41+
DataProvider string `json:"data_provider"` // Parent data provider
42+
StreamId string `json:"stream_id"` // Parent stream id
43+
ChildDataProvider string `json:"child_data_provider"` // Child data provider
44+
ChildStreamId string `json:"child_stream_id"` // Child stream id
45+
Weight string `json:"weight"`
46+
CreatedAt string `json:"created_at"`
47+
GroupSequence string `json:"group_sequence"`
48+
StartDate string `json:"start_date"`
49+
}
50+
51+
// MembershipRaw decodes the output of the check_role_membership procedure.
52+
type MembershipRaw struct {
53+
Wallet string `json:"wallet"`
54+
IsMember bool `json:"is_member"`
55+
}
56+
57+
// MemberRaw decodes the output of the get_role_members procedure.
58+
type MemberRaw struct {
59+
Wallet string `json:"wallet"`
60+
GrantedAt int64 `json:"granted_at"`
61+
GrantedBy string `json:"granted_by"`
62+
}

0 commit comments

Comments
 (0)