Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module github.com/carbonable-labs/indexer.sdk
go 1.22.2

require (
github.com/NethermindEth/juno v0.11.8
github.com/NethermindEth/starknet.go v0.7.0
github.com/nats-io/nats.go v1.35.0
github.com/stretchr/testify v1.9.0
)

require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/NethermindEth/juno v0.11.8 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
182 changes: 182 additions & 0 deletions sdk/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package nats

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"log/slog"
"net/http"
"os"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"

"github.com/carbonable-labs/indexer.sdk/sdk"
)

type (
NatsSDK struct {
opts NatsSDKOpts
}
NatsSDKOpts struct {
indexerToken string
indexerUrl string
indexerApi string
indexerApiKey string
}
NatsSDKOptsFn func(NatsSDKOpts) NatsSDKOpts
)

// Creates a new NatsSDK instance
// Package entrypoint
func NewSDK(o ...NatsSDKOptsFn) *NatsSDK {
opts := defaultNatsOpts()
for _, optFn := range o {
opts = optFn(opts)
}

return &NatsSDK{opts: opts}
}

// Configure method 
// Given a input config, it will register the app in the indexer and return the hash of the app
func (s *NatsSDK) Configure(ctx context.Context, c sdk.Config) (*sdk.RegisterResponse, error) {
slog.Debug("configure", "app_name", c.AppName)

client := http.DefaultClient

body, err := json.Marshal(c)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", s.opts.indexerApi+"/register", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
return nil, err
}

var r sdk.RegisterResponse
err = json.NewDecoder(resp.Body).Decode(&r)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return &r, nil
}

// RegisterHandler method 
// Based on the config you sent to configuration it will wait messages
// from queue and use the callback you provide to integrate messages into your system
func (s *NatsSDK) RegisterHandler(ctx context.Context, name string, subject string, cb sdk.ConsumerHandleFunc) (sdk.HandlerCancelFunc, error) {
slog.Debug("register handler", "app_name", name)

nc, err := nats.Connect(s.opts.indexerUrl, nats.Token(s.opts.indexerToken))
if err != nil {
slog.Error("failed to connect to nats", "error", err)
return nil, err
}

js, err := jetstream.New(nc)
if err != nil {
slog.Error("failed to create jetstream", "error", err)
return nil, err
}

c, err := js.CreateOrUpdateConsumer(context.Background(), "EVENTS", jetstream.ConsumerConfig{
Name: name,
Durable: name,
FilterSubject: subject,
})
if err != nil {
slog.Error("failed to create or update consumer", "error", err)
return nil, err
}
cctx, err := c.Consume(func(msg jetstream.Msg) {
// NOTE: Here is the piece of software to send messages to consumers.
// we can send the message plus some metadata to it
// eg: msg.Data(), sequenceId,
subject := msg.Subject()
meta, _ := msg.Metadata()

slog.Debug("received message", "subject", subject, "sequence", meta.Sequence.Stream)

var e sdk.RawEvent
decoder := gob.NewDecoder(bytes.NewReader(msg.Data()))
if err := decoder.Decode(&e); err != nil {
slog.Error("failed to decode raw event", "error", err)
return
}

err = cb(msg.Subject(), meta.Sequence.Stream, e)
if err != nil {
slog.Error("failed to consume message", "error", err)
return
}

_ = msg.Ack()
})
if err != nil {
slog.Error("failed to consume stream", "error", err)
return nil, err
}

return func() {
err := js.DeleteConsumer(context.Background(), "EVENTS", name)
if err != nil {
if !errors.Is(jetstream.ErrConsumerNotFound, err) {
slog.Error("failed to delete consumer", "error", err)
}
}
cctx.Stop()
}, nil
}

func (s *NatsSDK) Start(ctx context.Context) error {
return nil
}

// NatsSDKOpts default builder function
// creates options with default values overridable with env
func defaultNatsOpts() NatsSDKOpts {
return NatsSDKOpts{
indexerToken: os.Getenv("INDEXER_TOKEN"),
indexerUrl: os.Getenv("INDEXER_URL"),
indexerApi: os.Getenv("INDEXER_API"),
indexerApiKey: os.Getenv("INDEXER_API_KEY"),
}
}

func WithToken(t string) NatsSDKOptsFn {
return func(opts NatsSDKOpts) NatsSDKOpts {
opts.indexerToken = t
return opts
}
}

func WithUrl(u string) NatsSDKOptsFn {
return func(opts NatsSDKOpts) NatsSDKOpts {
opts.indexerUrl = u
return opts
}
}

func WithApi(a string) NatsSDKOptsFn {
return func(opts NatsSDKOpts) NatsSDKOpts {
opts.indexerApi = a
return opts
}
}

func WithApiKey(k string) NatsSDKOptsFn {
return func(opts NatsSDKOpts) NatsSDKOpts {
opts.indexerApiKey = k
return opts
}
}
149 changes: 149 additions & 0 deletions sdk/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package nats

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/carbonable-labs/indexer.sdk/sdk"
"github.com/stretchr/testify/assert"
)

var fakeConfigs = sdk.Config{
AppName: "test_config",
Contracts: []sdk.Contract{
{
Name: "project",
Address: "0x0516d0acb6341dcc567e85dc90c8f64e0c33d3daba0a310157d6bba0656c8769",
Events: map[string]string{
"URI": "project:uri",
},
},
{
Name: "project_karathuru",
Address: "0x05a667adc04676fba78a29371561a0bf91dab25847d5dc4709a93a4cfb5ff293",
Events: map[string]string{
"URI": "project:uri",
},
},
{
Name: "yielder_banegas_farm",
Address: "0x03d25473be5a6316f351e8f964d0c303357c006f7107779f648d9879b7c6d58a",
Events: map[string]string{
"Deposit": "yielder:deposit",
},
},
{
Name: "yielder_las_delicias",
Address: "0x00426d4e86913759bcc49b7f992b1fe62e6571e8f8089c23d95fea815dbad471",
Events: map[string]string{
"Deposit": "yielder:deposit",
},
},
{
Name: "yielder_manjarisoa",
Address: "0x03afe61732ed9b226309775ac4705129319729d3bee81da5632146ffd72652ae",
Events: map[string]string{
"Deposit": "yielder:deposit",
},
},
},
StartBlock: 0,
}

func TestNewSDKDefaultValues(t *testing.T) {
os.Setenv("INDEXER_TOKEN", "thisisasuperprivateroken")
os.Setenv("INDEXER_URL", "http://localhost:9999")

sdk := NewSDK()

assert.Equal(t, "thisisasuperprivateroken", sdk.opts.indexerToken)
assert.Equal(t, "http://localhost:9999", sdk.opts.indexerUrl)
assert.Equal(t, "", sdk.opts.indexerApi)
assert.Equal(t, "", sdk.opts.indexerApiKey)
}

func TestSDKOverride(t *testing.T) {
os.Setenv("INDEXER_TOKEN", "thisisasuperprivateroken")
os.Setenv("INDEXER_URL", "http://localhost:8888")
os.Setenv("INDEXER_API", "http://localhost:6000")
os.Setenv("INDEXER_API_KEY", "privateapikey")

sdk := NewSDK(WithToken("notsoprivatetoken"), WithUrl("http://localhost:9999"), WithApi("http://localhost:3000"), WithApiKey("apikey"))

assert.Equal(t, "notsoprivatetoken", sdk.opts.indexerToken)
assert.Equal(t, "http://localhost:9999", sdk.opts.indexerUrl)
assert.Equal(t, "http://localhost:3000", sdk.opts.indexerApi)
assert.Equal(t, "apikey", sdk.opts.indexerApiKey)
}

func TestConfigure(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/register" {
t.Errorf("expected /register, got %s", r.URL.Path)
}
var c sdk.Config
_ = json.NewDecoder(r.Body).Decode(&c)
defer r.Body.Close()
response := fmt.Sprintf(`{ "app_name": "%s", "hash": "test_hash" }`, c.AppName)
_, _ = w.Write([]byte(response))
}))
defer server.Close()

s := NewSDK(WithApi(server.URL))
conf, err := s.Configure(context.Background(), fakeConfigs)

assert.Equal(t, nil, err)
assert.Equal(t, "test_config", conf.AppName)
assert.Equal(t, "test_hash", conf.Hash)
}

func TestConfigureReal(t *testing.T) {
conf := `{
"app_name": "carbonable_portfolio_backend",
"start_block": 0,
"contracts": [
{
"name": "project",
"address": "0x00130b5a3035eef0470cff2f9a450a7a6856a3c5a4ea3f5b7886c2d03a50d2bf"
},
{
"name": "minter_banegas_farm",
"address": "0x2cf1693df4529343fed040fcefe33a50611aa93dd9c399e4baef0f08a82b99d"
},
{
"name": "yielder_banegas_farm",
"address": "0x00f6019754ab54ea7d806720d17b425c799db5ebb337e4b2d8c1ed71fc35f342"
},
{
"name": "offseter_banegas_farm",
"address": "0x008637332b17f5ffe7f21f076389e8a5461f25fbc0049ac0243b4e08591280df"
},
{
"name": "minter_las_delicias",
"address": "0x04c9c5303f0c0f40cdfd5f5631052288185e37abe3af54de9c37610b423b1b25"
},
{
"name": "yielder_las_delicias",
"address": "0x0370e85e8f315dc352eeef7e7f0f5d70e89c699384cbcb81a11a7089fa87ff66"
},
{
"name": "offseter_las_delicias",
"address": "0x04f634a74451bc19e4d537326dff7552c225040e9d9c16b26a32466eebdf9688"
}
]
}`
var c sdk.Config
err := json.Unmarshal([]byte(conf), &c)
if err != nil {
t.Errorf("failed to parse config")
}

s := NewSDK(WithApi("https://carbonable-event-indexer-sepolia.fly.dev"))
res, err := s.Configure(context.Background(), c)
fmt.Println(res, err)
}
Loading