diff --git a/appdatabase/database.go b/appdatabase/database.go index c5080aa5819..a412562a568 100644 --- a/appdatabase/database.go +++ b/appdatabase/database.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" d_common "github.com/status-im/status-go/common" + "github.com/status-im/status-go/kvstore" "github.com/status-im/status-go/logutils" "github.com/ethereum/go-ethereum/common" @@ -97,6 +98,12 @@ func InitializeDB(path, password string, kdfIterationsNumber int) (*sql.DB, erro return nil, err } + kvStore := kvstore.NewDB(db) + err = kvStore.DropDeprecatedKeys(kvstore.DeprecatedKeys) + if err != nil { + return nil, err + } + return db, nil } diff --git a/appdatabase/migrations/sql/1743042367_create_kv_store.up.sql b/appdatabase/migrations/sql/1743042367_create_kv_store.up.sql new file mode 100644 index 00000000000..030377c461e --- /dev/null +++ b/appdatabase/migrations/sql/1743042367_create_kv_store.up.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS kv_store ( + key TEXT PRIMARY KEY, + value BLOB +); \ No newline at end of file diff --git a/go.mod b/go.mod index e54bf371e71..51233f6cc45 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20250320134751-fdf03de179a7 + github.com/waku-org/go-waku v0.8.1-0.20250325095250-f59588a970e6 github.com/waku-org/waku-go-bindings v0.0.0-20250313132258-6f95d51df46c github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 diff --git a/go.sum b/go.sum index 8119d084031..08532b33f02 100644 --- a/go.sum +++ b/go.sum @@ -2150,8 +2150,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20250320134751-fdf03de179a7 h1:SxJt0AXNUWaqFnPOB3BBK7GKf+y7eVjToq4D5ngz0gw= -github.com/waku-org/go-waku v0.8.1-0.20250320134751-fdf03de179a7/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= +github.com/waku-org/go-waku v0.8.1-0.20250325095250-f59588a970e6 h1:jYJlwnFuQEgQMKfgwy1CXjmQxZlSgM/2ajY0cJpEiKI= +github.com/waku-org/go-waku v0.8.1-0.20250325095250-f59588a970e6/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/kvstore/database.go b/kvstore/database.go new file mode 100644 index 00000000000..ee07496308d --- /dev/null +++ b/kvstore/database.go @@ -0,0 +1,86 @@ +package kvstore + +import ( + "database/sql" +) + +const ( + WakuRlnRateLimitState = "waku/rln-rate-limit-state" + + ConfigRlnRateLimitEnabled = "config/rln-rate-limit-enabled" +) + +var DeprecatedKeys = []string{} + +// Database sql wrapper for db operations. +type Database struct { + db *sql.DB +} + +func NewDB(db *sql.DB) *Database { + return &Database{db: db} +} + +// Close closes database. +func (db Database) Close() error { + return db.db.Close() +} + +// Set stores a key-value pair in kv_store +func (db *Database) Set(key string, value []byte) error { + query := `INSERT INTO kv_store (key, value) VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value;` + _, err := db.db.Exec(query, key, value) + return err +} + +// Get retrieves a value by key +func (db *Database) Get(key string) ([]byte, error) { + var value []byte + err := db.db.QueryRow(`SELECT value FROM kv_store WHERE key = ?`, key).Scan(&value) + if err == sql.ErrNoRows { + return nil, nil + } + return value, err +} + +// SetBool stores a boolean value in kv_store +func (db *Database) SetBool(key string, value bool) error { + var boolByte byte + if value { + boolByte = 1 + } else { + boolByte = 0 + } + return db.Set(key, []byte{boolByte}) +} + +// GetBool retrieves a boolean value by key +func (db *Database) GetBool(key string) (bool, error) { + value, err := db.Get(key) + if err != nil { + return false, err + } + if value == nil { + return false, nil // Default to false if key is missing + } + return value[0] == 1, nil +} + +// Delete removes a key from kv_store +func (db *Database) Delete(key string) error { + _, err := db.db.Exec(`DELETE FROM kv_store WHERE key = ?`, key) + return err +} + +// DropKeys removes unused keys from kv_store +func (db *Database) DropDeprecatedKeys(keys []string) error { + for _, key := range keys { + err := db.Delete(key) + if err != nil { + return err + } + } + + return nil +} diff --git a/kvstore/database_test.go b/kvstore/database_test.go new file mode 100644 index 00000000000..5522f0e6ec6 --- /dev/null +++ b/kvstore/database_test.go @@ -0,0 +1,109 @@ +package kvstore + +import ( + "database/sql" + "testing" + + "github.com/status-im/status-go/sqlite" + "github.com/status-im/status-go/t/helpers" + + "github.com/stretchr/testify/require" +) + +type DbInitializer struct { +} + +func (a DbInitializer) Initialize(path, password string, kdfIterationsNumber int) (*sql.DB, error) { + db, err := sqlite.OpenDB(path, password, kdfIterationsNumber) + if err != nil { + return nil, err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS kv_store ( + key TEXT PRIMARY KEY, + value BLOB + );`) + + if err != nil { + return nil, err + } + + return db, nil +} + +func setupTestDB(t *testing.T) (*Database, func()) { + db, err := helpers.SetupTestMemorySQLDB(DbInitializer{}) + require.NoError(t, err) + return NewDB(db), func() { require.NoError(t, db.Close()) } +} + +func TestSetGetDelete(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + err := db.Set("key", []byte("value")) + require.NoError(t, err) + + value, err := db.Get("key") + require.NoError(t, err) + require.Equal(t, []byte("value"), value) + + err = db.Set("key", []byte("another-value")) + require.NoError(t, err) + + value, err = db.Get("key") + require.NoError(t, err) + require.Equal(t, []byte("another-value"), value) + + err = db.Delete("key") + require.NoError(t, err) + + value, err = db.Get("key") + require.NoError(t, err) + require.Nil(t, value) +} + +func TestSetBoolGetBool(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + value, err := db.GetBool("key") + require.NoError(t, err) + require.False(t, value) + + err = db.SetBool("key", true) + require.NoError(t, err) + + value, err = db.GetBool("key") + require.NoError(t, err) + require.True(t, value) + + err = db.SetBool("key", false) + require.NoError(t, err) + + value, err = db.GetBool("key") + require.NoError(t, err) + require.False(t, value) +} + +func TestDropDeprecatedKeys(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + err := db.Set("key", []byte("value")) + require.NoError(t, err) + + err = db.Set("deprecated-key", []byte("value")) + require.NoError(t, err) + + err = db.DropDeprecatedKeys([]string{"deprecated-key"}) + require.NoError(t, err) + + value, err := db.Get("key") + require.NoError(t, err) + require.Equal(t, []byte("value"), value) + + value, err = db.Get("deprecated-key") + require.NoError(t, err) + require.Nil(t, value) +} diff --git a/protocol/messenger_base_test.go b/protocol/messenger_base_test.go index b2c1d69c387..1116f1bdb51 100644 --- a/protocol/messenger_base_test.go +++ b/protocol/messenger_base_test.go @@ -8,10 +8,12 @@ import ( "go.uber.org/zap" + "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/wakuv2" wakutypes "github.com/status-im/status-go/waku/types" @@ -85,11 +87,15 @@ func newMessengerWithKey(shh wakutypes.Waku, privateKey *ecdsa.PrivateKey, logge } func newTestWakuNode(logger *zap.Logger) (wakutypes.Waku, error) { + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + if err != nil { + return nil, err + } return wakuv2.New( nil, &wakuv2.DefaultConfig, logger, - nil, + db, nil, func([]byte, peer.AddrInfo, error) {}, nil, diff --git a/server/pairing/sync_device_test.go b/server/pairing/sync_device_test.go index a931f06739c..97345b5cb75 100644 --- a/server/pairing/sync_device_test.go +++ b/server/pairing/sync_device_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/status-im/status-go/api" + "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/common/dbsetup" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" @@ -33,6 +34,7 @@ import ( "github.com/status-im/status-go/protocol/tt" accservice "github.com/status-im/status-go/services/accounts" "github.com/status-im/status-go/services/browsers" + "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/wakuv2" ) @@ -85,8 +87,10 @@ func (s *SyncDeviceSuite) SetupTest() { DefaultShardPubsubTopic: wakuv2.DefaultShardPubsubTopic(), EnableStoreConfirmationForMessagesSent: false, } - var err error - s.pxBootNode, err = wakuv2.New(nil, exchangeNodeConfig, s.logger.Named("pxServerNode"), nil, nil, nil, nil) + + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + s.pxBootNode, err = wakuv2.New(nil, exchangeNodeConfig, s.logger.Named("pxServerNode"), db, nil, nil, nil) s.Require().NoError(err) s.Require().NoError(s.pxBootNode.Start()) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rln_rate_limiting.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rln_rate_limiting.go index 11a469bff13..d0b52689707 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rln_rate_limiting.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rln_rate_limiting.go @@ -11,8 +11,8 @@ import ( var ErrRateLimited = errors.New("rate limit exceeded") -const RlnLimiterCapacity = 100 -const RlnLimiterRefillInterval = 10 * time.Minute +const DefaultRlnLimiterCapacity = 600 +const DefaultRlnLimiterRefillInterval = 10 * time.Minute // RlnRateLimiter is used to rate limit the outgoing messages, // The capacity and refillInterval comes from RLN contract configuration. @@ -22,15 +22,23 @@ type RlnRateLimiter struct { tokens int refillInterval time.Duration lastRefill time.Time + updateCh chan RlnRateLimitState +} + +// RlnRateLimitState includes the information that need to be persisted in database. +type RlnRateLimitState struct { + RemainingTokens int + LastRefill time.Time } // NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket. -func NewRlnRateLimiter(capacity int, refillInterval time.Duration) *RlnRateLimiter { +func NewRlnRateLimiter(capacity int, refillInterval time.Duration, state RlnRateLimitState, updateCh chan RlnRateLimitState) *RlnRateLimiter { return &RlnRateLimiter{ capacity: capacity, - tokens: capacity, // Start with a full bucket + tokens: state.RemainingTokens, refillInterval: refillInterval, - lastRefill: time.Now(), + lastRefill: state.LastRefill, + updateCh: updateCh, } } @@ -42,19 +50,26 @@ func (rl *RlnRateLimiter) Allow() bool { // Refill tokens if the refill interval has passed now := time.Now() if now.Sub(rl.lastRefill) >= rl.refillInterval { - rl.tokens = rl.capacity // Refill the bucket + rl.tokens = rl.capacity rl.lastRefill = now + rl.sendUpdate() } // Check if there are tokens available if rl.tokens > 0 { rl.tokens-- + rl.sendUpdate() return true } return false } +// sendUpdate sends the latest token state to the update channel. +func (rl *RlnRateLimiter) sendUpdate() { + rl.updateCh <- RlnRateLimitState{RemainingTokens: rl.tokens, LastRefill: rl.lastRefill} +} + func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { if rl.Allow() { return nil diff --git a/vendor/modules.txt b/vendor/modules.txt index f1b30ba94c0..9466e12d2cc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1041,7 +1041,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20250320134751-fdf03de179a7 +# github.com/waku-org/go-waku v0.8.1-0.20250325095250-f59588a970e6 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/gowaku.go b/wakuv2/gowaku.go index b4793caf544..bbbb3ab093e 100644 --- a/wakuv2/gowaku.go +++ b/wakuv2/gowaku.go @@ -26,6 +26,7 @@ import ( "crypto/ecdsa" "crypto/sha256" "database/sql" + "encoding/json" "errors" "fmt" "math" @@ -81,6 +82,7 @@ import ( gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" ethtypes "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/kvstore" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/timesource" "github.com/status-im/status-go/wakuv2/common" @@ -132,8 +134,9 @@ type IMetricsHandler interface { // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { - node *node.WakuNode // reference to a libp2p waku node - appDB *sql.DB + node *node.WakuNode // reference to a libp2p waku node + appDB *sql.DB + kvStore *kvstore.Database dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map @@ -247,6 +250,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, appDB *sql. waku := &Waku{ appDB: appDB, + kvStore: kvstore.NewDB(appDB), cfg: cfg, privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), @@ -1340,7 +1344,55 @@ func (w *Waku) startMessageSender() error { publishMethod = publish.LightPush } - sender, err := publish.NewMessageSender(publishMethod, publish.NewDefaultPublisher(w.node.Lightpush(), w.node.Relay()), nil, w.logger) + var rateLimiter publish.PublishRateLimiter + rlnRateLimitEnabled, err := w.kvStore.GetBool(kvstore.ConfigRlnRateLimitEnabled) + if err != nil { + w.logger.Error("failed to get rln rate limit enabled", zap.Error(err)) + return err + } + if rlnRateLimitEnabled { + state := publish.RlnRateLimitState{ + RemainingTokens: publish.DefaultRlnLimiterCapacity, + LastRefill: time.Now(), + } + stateStored, err := w.kvStore.Get(kvstore.WakuRlnRateLimitState) + if err != nil { + w.logger.Error("failed to get rln rate limit state", zap.Error(err)) + return err + } + if stateStored != nil { + err = json.Unmarshal(stateStored, &state) + if err != nil { + w.logger.Error("failed to unmarshal rln rate limit state", zap.Error(err)) + return err + } + } + + updateCh := make(chan publish.RlnRateLimitState, 10) + rateLimiter = publish.NewRlnRateLimiter(publish.DefaultRlnLimiterCapacity, publish.DefaultRlnLimiterRefillInterval, state, updateCh) + + go func() { + defer gocommon.LogOnPanic() + for { + select { + case <-w.ctx.Done(): + return + case state := <-updateCh: + stateBytes, err := json.Marshal(state) + if err != nil { + w.logger.Error("failed to marshal rln rate limit state", zap.Error(err)) + continue + } + err = w.kvStore.Set(kvstore.WakuRlnRateLimitState, stateBytes) + if err != nil { + w.logger.Error("failed to put rln rate limit state", zap.Error(err)) + } + } + } + }() + } + + sender, err := publish.NewMessageSender(publishMethod, publish.NewDefaultPublisher(w.node.Lightpush(), w.node.Relay()), rateLimiter, w.logger) if err != nil { w.logger.Error("failed to create message sender", zap.Error(err)) return err diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 620e605cf04..a15016e6232 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -66,7 +66,9 @@ func TestDiscoveryV5(t *testing.T) { setDefaultConfig(config, false) config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} config.DiscoveryLimit = 20 - w, err := New(nil, config, nil, nil, nil, nil, nil) + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + w, err := New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -92,7 +94,9 @@ func TestRestartDiscoveryV5(t *testing.T) { config.DiscoveryLimit = 20 config.UDPPort = 10002 config.ClusterID = 16 - w, err := New(nil, config, nil, nil, nil, nil, nil) + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + w, err := New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -135,8 +139,10 @@ func TestRelayPeers(t *testing.T) { config := &Config{ EnableMissingMessageVerification: true, } + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) setDefaultConfig(config, false) - w, err := New(nil, config, nil, nil, nil, nil, nil) + w, err := New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) _, err = w.RelayPeersByTopic(config.DefaultShardPubsubTopic) @@ -146,7 +152,7 @@ func TestRelayPeers(t *testing.T) { config = &Config{} config.ClusterID = 16 config.LightClient = true - w, err = New(nil, config, nil, nil, nil, nil, nil) + w, err = New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) _, err = w.RelayPeersByTopic(config.DefaultShardPubsubTopic) @@ -194,7 +200,9 @@ func TestBasicWakuV2(t *testing.T) { config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 config.WakuNodes = []string{enrTreeAddress} - w, err := New(nil, config, nil, nil, nil, nil, nil) + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + w, err := New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -334,7 +342,9 @@ func TestPeerExchange(t *testing.T) { config.EnableDiscV5 = true config.EnablePeerExchangeServer = true config.EnablePeerExchangeClient = false - pxServerNode, err := New(nil, config, logger.Named("pxServerNode"), nil, nil, nil, nil) + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + pxServerNode, err := New(nil, config, logger.Named("pxServerNode"), db, nil, nil, nil) require.NoError(t, err) require.NoError(t, pxServerNode.Start()) @@ -347,7 +357,7 @@ func TestPeerExchange(t *testing.T) { config.EnablePeerExchangeServer = false config.EnablePeerExchangeClient = false config.DiscV5BootstrapNodes = []string{pxServerNode.node.ENR().String()} - discV5Node, err := New(nil, config, logger.Named("discV5Node"), nil, nil, nil, nil) + discV5Node, err := New(nil, config, logger.Named("discV5Node"), db, nil, nil, nil) require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -366,7 +376,7 @@ func TestPeerExchange(t *testing.T) { config.Resolver = resolver config.WakuNodes = []string{url} - lightNode, err := New(nil, config, logger.Named("lightNode"), nil, nil, nil, nil) + lightNode, err := New(nil, config, logger.Named("lightNode"), db, nil, nil, nil) require.NoError(t, err) require.NoError(t, lightNode.Start()) @@ -636,7 +646,9 @@ func waitForEnvelope(t *testing.T, contentTopic string, envCh chan common.Envelo } func TestOnlineChecker(t *testing.T) { - w, err := New(nil, nil, nil, nil, nil, nil, nil) + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + w, err := New(nil, nil, nil, db, nil, nil, nil) require.NoError(t, w.Start()) require.NoError(t, err) @@ -662,7 +674,7 @@ func TestOnlineChecker(t *testing.T) { config := &Config{} config.ClusterID = 16 config.LightClient = true - lightNode, err := New(nil, config, nil, nil, nil, nil, nil) + lightNode, err := New(nil, config, nil, db, nil, nil, nil) require.NoError(t, err) err = lightNode.Start()