From a0cb52cc2bc20942bfcbc1d2a26ecc0d5d0cab53 Mon Sep 17 00:00:00 2001 From: aya Date: Tue, 18 Feb 2025 12:25:19 +0200 Subject: [PATCH 1/7] Add store tests --- waku/nwaku.go | 452 +++++++++++++++++++++++++++++++++++++-------- waku/store_test.go | 83 +++++++++ waku/test_data.go | 14 +- 3 files changed, 473 insertions(+), 76 deletions(-) create mode 100644 waku/store_test.go diff --git a/waku/nwaku.go b/waku/nwaku.go index 1f29fd6..a150ab3 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -56,6 +56,15 @@ package waku // resp must be set != NULL in case interest on retrieving data from the callback void GoCallback(int ret, char* msg, size_t len, void* resp); + #define WAKU_CALL(call) \ + do { \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ + } while (0) + static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); @@ -63,27 +72,27 @@ package waku } static void cGoWakuStart(void* wakuCtx, void* resp) { - waku_start(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - waku_version(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -109,21 +118,21 @@ package waku char* encoding, void* resp) { - waku_content_topic(wakuCtx, + WAKU_CALL( waku_content_topic(wakuCtx, appName, appVersion, contentTopicName, encoding, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -132,44 +141,44 @@ package waku int timeoutMs, void* resp) { - waku_relay_publish(wakuCtx, + WAKU_CALL (waku_relay_publish(wakuCtx, pubSubTopic, jsonWakuMessage, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - waku_relay_subscribe(wakuCtx, + WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { - waku_relay_add_protected_shard(wakuCtx, + WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx, clusterId, shardId, publicKey, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - waku_relay_unsubscribe(wakuCtx, + WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { - waku_connect(wakuCtx, + WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDialPeer(void* wakuCtx, @@ -178,12 +187,12 @@ package waku int timeoutMs, void* resp) { - waku_dial_peer(wakuCtx, + WAKU_CALL( waku_dial_peer(wakuCtx, peerMultiAddr, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDialPeerById(void* wakuCtx, @@ -192,51 +201,51 @@ package waku int timeoutMs, void* resp) { - waku_dial_peer_by_id(wakuCtx, + WAKU_CALL( waku_dial_peer_by_id(wakuCtx, peerId, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { - waku_disconnect_peer_by_id(wakuCtx, + WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -244,11 +253,11 @@ package waku const char* jsonWakuMessage, void* resp) { - waku_lightpush_publish(wakuCtx, + WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuStoreQuery(void* wakuCtx, @@ -257,32 +266,32 @@ package waku int timeoutMs, void* resp) { - waku_store_query(wakuCtx, - jsonQuery, - peerAddr, - timeoutMs, - (WakuCallBack) GoCallback, - resp); + WAKU_CALL (waku_store_query(wakuCtx, + jsonQuery, + peerAddr, + timeoutMs, + (WakuCallBack) GoCallback, + resp)); } static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { - waku_peer_exchange_request(wakuCtx, + WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { - waku_get_peerids_by_protocol(wakuCtx, + WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuDnsDiscovery(void* wakuCtx, @@ -291,12 +300,12 @@ package waku int timeoutMs, void* resp) { - waku_dns_discovery(wakuCtx, - entTreeUrl, - nameDnsServer, - timeoutMs, - (WakuCallBack) GoCallback, - resp); + WAKU_CALL (waku_dns_discovery(wakuCtx, + entTreeUrl, + nameDnsServer, + timeoutMs, + (WakuCallBack) GoCallback, + resp)); } */ @@ -325,12 +334,89 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) const requestTimeout = 30 * time.Second -const MsgChanBufferSize = 1024 -const TopicHealthChanBufferSize = 1024 -const ConnectionChangeChanBufferSize = 1024 +const MsgChanBufferSize = 100 +const TopicHealthChanBufferSize = 100 +const ConnectionChangeChanBufferSize = 100 + +type WakuConfig struct { + Host string `json:"host,omitempty"` + Nodekey string `json:"nodekey,omitempty"` + Relay bool `json:"relay"` + Store bool `json:"store,omitempty"` + LegacyStore bool `json:"legacyStore"` + Storenode string `json:"storenode,omitempty"` + StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` + StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` + StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"` + StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"` + StoreResume bool `json:"storeResume,omitempty"` + Filter bool `json:"filter,omitempty"` + Filternode string `json:"filternode,omitempty"` + FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"` + FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"` + FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"` + Lightpush bool `json:"lightpush,omitempty"` + LightpushNode string `json:"lightpushnode,omitempty"` + LogLevel string `json:"logLevel,omitempty"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort int `json:"discv5UdpPort,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort int `json:"tcpPort,omitempty"` + RateLimits RateLimitsConfig `json:"rateLimits,omitempty"` +} + +type RateLimitsConfig struct { + Filter *RateLimit `json:"-"` + Lightpush *RateLimit `json:"-"` + PeerExchange *RateLimit `json:"-"` +} + +func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) { + output := []string{} + if rlc.Filter != nil { + output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String())) + } + if rlc.Lightpush != nil { + output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String())) + } + if rlc.PeerExchange != nil { + output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String())) + } + return json.Marshal(output) +} + +type RateLimitTimeUnit string + +const Hour RateLimitTimeUnit = "h" +const Minute RateLimitTimeUnit = "m" +const Second RateLimitTimeUnit = "s" +const Millisecond RateLimitTimeUnit = "ms" + +type RateLimit struct { + Volume int // Number of allowed messages per period + Period int // Length of each rate-limit period (in TimeUnit) + TimeUnit RateLimitTimeUnit // Time unit of the period +} + +func (rl RateLimit) String() string { + return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit) +} + +func (rl RateLimit) MarshalJSON() ([]byte, error) { + return json.Marshal(rl.String()) +} //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -347,14 +433,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer - config *common.WakuConfig + config *WakuConfig MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string } -func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { +func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ config: config, @@ -571,7 +657,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - Debug("No connected peers found for " + n.nodeName) + Debug("No connected peers found for %s", n.nodeName) return nil, nil } @@ -598,10 +684,21 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to subscribe to relay: %v", err) + return err + } + + if n.wakuCtx == nil { + err := errors.New("wakuCtx is nil") + Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) + return err } + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + wg := sync.WaitGroup{} + wg.Add(1) var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) @@ -609,20 +706,20 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - if n.wakuCtx == nil { - return errors.New("wakuCtx is nil") - } - - wg.Add(1) + Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() + + Debug("Waiting for response from cGoWakuRelaySubscribe on node %s", n.nodeName) + wg.Wait() // Ensures the function completes before proceeding if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelaySubscribe: " + errMsg) } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -658,7 +755,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to unsubscribe from relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -674,15 +773,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { } wg.Add(1) + Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelayUnsubscribe: " + errMsg) } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { @@ -698,18 +801,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) if err != nil { + Error("Failed to parse number of received peers: %v", err) return 0, err } return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("PeerExchangeRequest failed: %v", errMsg) return 0, errors.New(errMsg) } func (n *WakuNode) StartDiscV5() error { - wg := sync.WaitGroup{} + Debug("Starting DiscV5 for node: %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -717,9 +823,11 @@ func (n *WakuNode) StartDiscV5() error { C.cGoWakuStartDiscV5(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully started DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -734,9 +842,11 @@ func (n *WakuNode) StopDiscV5() error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -752,11 +862,13 @@ func (n *WakuNode) Version() (string, error) { if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) return version, nil } errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) return "", errors.New(errMsg) } @@ -833,6 +945,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), errors.New(errMsg) } +func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { + if n == nil { + err := errors.New("cannot publish message; node is nil") + Error("Failed to publish message via relay: %v", err) + return "", err + } + + // Handling context internally with a timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Attempting to publish message via relay on node %s", n.nodeName) + + msgHash, err := n.RelayPublish(ctx, message, pubsubTopic) + if err != nil { + Error("Failed to publish message via relay on node %s: %v", n.nodeName, err) + return "", err + } + + Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String()) + return msgHash, nil +} + func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} @@ -957,7 +1092,7 @@ func (n *WakuNode) Destroy() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully destroyed " + n.nodeName) + Debug("Successfully destroyed %s", n.nodeName) return nil } @@ -1274,17 +1409,27 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { } // Create & start node -func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) { +func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { Debug("Initializing %s", nodeName) - var nodeCfg common.WakuConfig + var nodeCfg WakuConfig if customCfg == nil { nodeCfg = DefaultWakuConfig + } else { nodeCfg = *customCfg } + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports + } + + nodeCfg.TcpPort = tcpPort + nodeCfg.Discv5UdpPort = udpPort + Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) if err != nil { @@ -1378,3 +1523,162 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } + +func ConnectAllPeers(nodes []*WakuNode) error { + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + <-ctx.Done() + Debug("Connections stabilized") + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { + timeout := 3 * time.Second + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", timeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") + } +} + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +} + +func WaitForAutoConnection(nodeList []*WakuNode) error { + Debug("Waiting for auto-connection of nodes...") + + var hardWait = 30 * time.Second + Debug("Applying hard wait of %v seconds before checking connections", hardWait.Seconds()) + time.Sleep(hardWait) + + for _, node := range nodeList { + peers, err := node.GetConnectedPeers() + if err != nil { + Error("Failed to get connected peers for node %s: %v", node.nodeName, err) + return err + } + + if len(peers) < 1 { + Error("Node %s has no connected peers, expected at least 1", node.nodeName) + return errors.New("expected at least one connected peer") + } + + Debug("Node %s has %d connected peers", node.nodeName, len(peers)) + } + + Debug("Auto-connection check completed successfully") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := RetryWithBackOff(func() error { + return node.RelaySubscribe(topic) + }) + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +} + +func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.StoreQueryRequest) (*common.StoreQueryResponse, error) { + Debug("Starting store query request") + + if storeRequest == nil { + Debug("Using DefaultStoreQueryRequest") + storeRequest = &DefaultStoreQueryRequest + } + + storeMultiaddr, err := storeNode.ListenAddresses() + if err != nil { + Error("Failed to retrieve listen addresses for store node: %v", err) + return nil, err + } + + if len(storeMultiaddr) == 0 { + Error("Store node has no available listen addresses") + return nil, fmt.Errorf("store node has no available listen addresses") + } + + storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String()) + if err != nil { + Error("Failed to convert store node address to AddrInfo: %v", err) + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Querying store node for messages") + res, err := n.StoreQuery(ctx, storeRequest, *storeNodeAddrInfo) + if err != nil { + Error("StoreQuery failed: %v", err) + return nil, err + } + + Debug("Store query successful, retrieved %d messages", len(*res.Messages)) + return res, nil +} diff --git a/waku/store_test.go b/waku/store_test.go new file mode 100644 index 0000000..3c2488e --- /dev/null +++ b/waku/store_test.go @@ -0,0 +1,83 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + //"github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestStoreQueryFromPeer(t *testing.T) { + Debug("Starting test to verify store query from a peer using direct peer connections") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + Debug("Publishing message from Node1 using RelayPublish") + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte("test-message"), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + defaultPubsubTopic := DefaultPubsubTopic + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting for message delivery to Node2") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Node3 querying stored messages from Node2") + res, err := node3.GetStoredMessages(node2, nil) + var storedMessages = *res.Messages + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotEmpty(t, storedMessages, "Expected at least one stored message") + Debug("Verifying stored message matches the published message") + require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match") + Debug("Test successfully verified store query from a peer using direct peer connections") +} diff --git a/waku/test_data.go b/waku/test_data.go index 121e1f2..ff8859d 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -4,9 +4,11 @@ import ( "time" "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) -var DefaultWakuConfig common.WakuConfig +var DefaultWakuConfig WakuConfig +var DefaultStoreQueryRequest common.StoreQueryRequest func init() { @@ -17,7 +19,7 @@ func init() { Error("Failed to get free ports %v %v", err1, err2) } - DefaultWakuConfig = common.WakuConfig{ + DefaultWakuConfig = WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: true, @@ -30,6 +32,14 @@ func init() { Discv5UdpPort: udpPort, TcpPort: tcpPort, } + + DefaultStoreQueryRequest = common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(50)), + PaginationForward: true, + TimeStart: proto.Int64(time.Now().Add(-5 * time.Minute).UnixNano()), // 5 mins before now + } } const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node From 26b6b34aa4815f8f380d78d04ae10a9f7be3943e Mon Sep 17 00:00:00 2001 From: aya Date: Tue, 18 Feb 2025 15:28:41 +0200 Subject: [PATCH 2/7] Add more store messages --- waku/store_test.go | 168 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 1 deletion(-) diff --git a/waku/store_test.go b/waku/store_test.go index 3c2488e..65df8e7 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -1,13 +1,14 @@ package waku import ( + "fmt" "testing" "time" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - //"github.com/waku-org/waku-go-bindings/waku/common" + "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) @@ -81,3 +82,168 @@ func TestStoreQueryFromPeer(t *testing.T) { require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match") Debug("Test successfully verified store query from a peer using direct peer connections") } + +func TestStoreQueryMultipleMessages(t *testing.T) { + Debug("Starting test to verify store query with multiple messages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 50 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2") + res, err := node3.GetStoredMessages(node2, nil) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, numMessages, "Expected to retrieve exactly 50 messages") + + Debug("Verifying stored message hashes match sent message hashes") + var receivedHashes []common.MessageHash + for _, storedMsg := range storedMessages { + receivedHashes = append(receivedHashes, storedMsg.MessageHash) + } + + require.ElementsMatch(t, sentHashes, receivedHashes, "Sent and received message hashes do not match") + + Debug("Test successfully verified store query with multiple messages") +} + +func TestStoreQueryWith5Pagination(t *testing.T) { + Debug("Starting test to verify store query with pagination") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 10 + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + _, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2 with PaginationLimit = 5") + storeRequest := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(5)), + PaginationForward: true, + } + + res, err := node3.GetStoredMessages(node2, &storeRequest) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, 5, "Expected to retrieve exactly 5 messages due to pagination limit") + + Debug("Test successfully verified store query with pagination limit") +} From 56d90642a547a35367797eb4f50b45029000507b Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 19 Feb 2025 06:49:18 +0200 Subject: [PATCH 3/7] Add page store test --- waku/store_test.go | 212 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/waku/store_test.go b/waku/store_test.go index 65df8e7..143c77d 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -247,3 +247,215 @@ func TestStoreQueryWith5Pagination(t *testing.T) { Debug("Test successfully verified store query with pagination limit") } + +func TestStoreQueryWithPaginationMultiplePages(t *testing.T) { + Debug("Starting test to verify store query with pagination across multiple pages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[i+5], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination across multiple pages") +} + +func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { + Debug("Starting test to verify store query with pagination in reverse order") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2 (Newest first)") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[numMessages-1-i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[numMessages-6-i], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination in reverse order") +} From 4f580615ef364374cecb4a73cb9fc361ef45b368 Mon Sep 17 00:00:00 2001 From: aya Date: Sat, 22 Feb 2025 20:09:18 +0200 Subject: [PATCH 4/7] Add API flags store tests --- waku/store_test.go | 350 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) diff --git a/waku/store_test.go b/waku/store_test.go index 143c77d..211785b 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -459,3 +459,353 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { Debug("Test successfully verified store query pagination in reverse order") } + +func TestQueryFailwhenNoStorePeer(t *testing.T) { + Debug("Starting test to verify store query failure when node2 has no store") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = false + + Debug("Creating Node2 with Relay enabled but Store disabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages since Node2 has store disabled") + storedMessages, err := node3.GetStoredMessages(node2, nil) + require.Error(t, err, "Expected Node3's store query to fail because Node2 has store disabled") + require.Empty(t, storedMessages, "Expected no messages in store for Node3") + + Debug("Test successfully verified that store query fails when Node2 does not store messages") +} + +func TestQueryFailWithIncorrectStaticNode(t *testing.T) { + Debug("Starting test to verify store query failure when Node3 has an incorrect static node address") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node1Address, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.Staticnodes = []string{node1Address[0].String()} // Node2 connects to Node1 + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node2") + + var incorrectAddress = node2Address[0].String()[:len(node2Address[0].String())-10] + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Staticnodes = []string{incorrectAddress} + + Debug("Original Node2 Address: %s", node2Address[0].String()) + Debug("Modified Node2 Address: %s", incorrectAddress) + + Debug("Creating Node3 with an incorrect static node address") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages due to incorrect static node") + //storeQueryRequest := &common.StoreQueryRequest{ + //TimeStart: proto.Int64(time.Now().UnixNano()), // Query messages published after this timestamp + // } + storedmsgs, err := node3.GetStoredMessages(node2, nil) + require.NoError(t, err, "Expected Node3's store query to fail due to incorrect static node") + //require.NoEmpty(t, storedMessages, "Expected no messages in store for Node3") + if (storedmsgs.Messages) != nil { + Debug("First Stored Message: Payload: %s", string((*storedmsgs.Messages)[0].WakuMessage.Payload)) + } + Debug("Test successfully verified store query failure due to incorrect static node configuration") +} + +func TestStoreQueryWithoutData(t *testing.T) { + Debug("Starting test to verify store query returns only message hashes when IncludeData is false") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true // Enable store on Node2 + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with IncludeData = false") + storeQueryRequest := &common.StoreQueryRequest{ + IncludeData: false, + } + + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.NotNil(t, storedmsgs.Messages, "Expected store response to contain message hashes") + //require.Len(t, *storedmsgs.Messages, 1, "Expected exactly one stored message") + + // Access the first message + // firstMessage := (*storedmsgs.Messages)[0] + + firstMessage := (*storedmsgs.Messages)[0] + require.Nil(t, firstMessage.WakuMessage, "Expected message payload to be empty when IncludeData is false") + require.NotEmpty(t, (*storedmsgs.Messages)[0].MessageHash, "Expected message hash to be present") + Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + + Debug("Test successfully verified that store query returns only message hashes when IncludeData is false") +} + +func TestStoreQueryWithWrongContentTopic(t *testing.T) { + Debug("Starting test to verify store query fails when using an incorrect content topic and an old timestamp") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message with a correct content topic") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with an incorrect content topic and an old timestamp") + storeQueryRequest := &common.StoreQueryRequest{ + ContentTopics: &[]string{"incorrect-content-topic"}, + TimeStart: queryTimestamp, + } + + storedmsgs, _ := node3.GetStoredMessages(node2, storeQueryRequest) + //require.Error(t, err, "Expected error when querying with an incorrect content topic and old timestamp") + require.Nil(t, (*storedmsgs.Messages)[0], "Expected no messages to be returned for incorrect content topic and timestamp") + //Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + Debug("Test successfully verified that store query fails when using an incorrect content topic and an old timestamp") +} + +func TestCheckStoredMSGsEphemeralTrue(t *testing.T) { + Debug("Starting test to verify ephemeral messages are not stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing an ephemeral message") + message := node1.CreateMessage() + ephemeralTrue := true + message.Ephemeral = &ephemeralTrue + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 0, len(*storedmsgs.Messages), "Expected no stored messages for ephemeral messages") + + Debug("Test successfully verified that ephemeral messages are not stored") +} + +func TestCheckStoredMSGsEphemeralFalse(t *testing.T) { + Debug("Starting test to verify non-ephemeral messages are stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a non-ephemeral message") + message := node1.CreateMessage() + ephemeralFalse := false + message.Ephemeral = &ephemeralFalse + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test successfully verified that exactly one non-ephemeral message is stored") +} From 42333217dc4ac06febcbc0afb05dfada605fff9a Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 23 Feb 2025 13:19:11 +0200 Subject: [PATCH 5/7] Add more tests logic --- waku/store_test.go | 117 +++++++++++++++++++++++++++++++++++++++++++-- waku/test_data.go | 46 ++++++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/waku/store_test.go b/waku/store_test.go index 211785b..cd4771a 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -12,7 +12,7 @@ import ( "google.golang.org/protobuf/proto" ) -func TestStoreQueryFromPeer(t *testing.T) { +func TestStoreQuery3Nodes(t *testing.T) { Debug("Starting test to verify store query from a peer using direct peer connections") node1Config := DefaultWakuConfig @@ -62,8 +62,8 @@ func TestStoreQueryFromPeer(t *testing.T) { Timestamp: proto.Int64(time.Now().UnixNano()), }) - defaultPubsubTopic := DefaultPubsubTopic - msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) require.NoError(t, err, "Failed to publish message from Node1") Debug("Waiting for message delivery to Node2") @@ -74,7 +74,10 @@ func TestStoreQueryFromPeer(t *testing.T) { require.NoError(t, err, "Node2 should have received the message") Debug("Node3 querying stored messages from Node2") - res, err := node3.GetStoredMessages(node2, nil) + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + res, err := node3.GetStoredMessages(node2, storeQueryRequest) var storedMessages = *res.Messages require.NoError(t, err, "Failed to retrieve stored messages from Node2") require.NotEmpty(t, storedMessages, "Expected at least one stored message") @@ -807,5 +810,109 @@ func TestCheckStoredMSGsEphemeralFalse(t *testing.T) { require.NoError(t, err, "Failed to query store messages from Node2") require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") - Debug("Test successfully verified that exactly one non-ephemeral message is stored") + Debug("Test finished successfully ") +} + +func TestCheckLegacyStore(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 ") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.LegacyStore = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test finished successfully ") + +} + +func TestStoredMessagesWithVDifferentPayloads(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 ") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pLoad := range SAMPLE_INPUTS { + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing message with payload: %s", pLoad.Value) + message := node1.CreateMessage() + message.Payload = []byte(pLoad.Value) + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + retrievedMessage := (*storedmsgs.Messages)[0] + require.Equal(t, pLoad.Value, string(retrievedMessage.WakuMessage.Payload), "Expected WakuMessage but got nil") + Debug("Payload matches expected %s", string(retrievedMessage.WakuMessage.Payload)) + } + + Debug("Test finished successfully ") } diff --git a/waku/test_data.go b/waku/test_data.go index ff8859d..23d34c8 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -49,3 +49,49 @@ var ( MinPort = 1024 // Minimum allowable port (exported) MaxPort = 65535 // Maximum allowable port (exported) ) + +var SAMPLE_INPUTS = []struct { + Description string + Value string +}{ + {"A simple string", "Hello World!"}, + {"An integer", "1234567890"}, + {"A dictionary", `{"key": "value"}`}, + {"Chinese characters", "这是一些中文"}, + {"Emojis", "🚀🌟✨"}, + {"Lorem ipsum text", "Lorem ipsum dolor sit amet"}, + {"HTML content", "Hello"}, + {"Cyrillic characters", "\u041f\u0440\u0438\u0432\u0435\u0442"}, + {"Base64 encoded string", "Base64==dGVzdA=="}, + {"Binary data", "d29ya2luZyB3aXRoIGJpbmFyeSBkYXRh: \x50\x51"}, + {"Special characters with whitespace", "\t\nSpecial\tCharacters\n"}, + {"Boolean false as a string", "False"}, + {"A float number", "3.1415926535"}, + {"A list", "[1, 2, 3, 4, 5]"}, + {"Hexadecimal number as a string", "0xDEADBEEF"}, + {"Email format", "user@example.com"}, + {"URL format", "http://example.com"}, + {"Date and time in ISO format", "2023-11-01T12:00:00Z"}, + {"String with escaped quotes", `"Escaped" \"quotes\"`}, + {"A regular expression", "Regular expression: ^[a-z0-9_-]{3,16}$"}, + {"A very long string", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}, + {"A JSON string", `{"name": "John", "age": 30, "city": "New York"}`}, + {"A Unix path", "/usr/local/bin"}, + {"A Windows path", "C:\\Windows\\System32"}, + {"An SQL query", "SELECT * FROM users WHERE id = 1;"}, + {"JavaScript code snippet", "function test() { console.log('Hello World'); }"}, + {"A CSS snippet", "body { background-color: #fff; }"}, + {"A Python one-liner", "print('Hello World')"}, + {"An IP address", "192.168.1.1"}, + {"A domain name", "www.example.com"}, + {"A user agent string", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, + {"A credit card number", "1234-5678-9012-3456"}, + {"A phone number", "+1234567890"}, + {"A UUID", "123e4567-e89b-12d3-a456-426614174000"}, + {"A hashtag", "#helloWorld"}, + {"A Twitter handle", "@username"}, + {"A password", "P@ssw0rd!"}, + {"A date in common format", "01/11/2023"}, + {"A time string", "12:00:00"}, + {"A mathematical equation", "E = mc^2"}, +} From 3d03ccb7c3bfa6965dfbdeb360f29008903be76a Mon Sep 17 00:00:00 2001 From: aya Date: Tue, 25 Feb 2025 07:45:16 +0200 Subject: [PATCH 6/7] new set of tests --- waku/store_test.go | 128 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 119 insertions(+), 9 deletions(-) diff --git a/waku/store_test.go b/waku/store_test.go index cd4771a..4e3ce68 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -533,7 +533,7 @@ func TestQueryFailWithIncorrectStaticNode(t *testing.T) { node2Config := DefaultWakuConfig node2Config.Relay = true node2Config.Store = true - node2Config.Staticnodes = []string{node1Address[0].String()} // Node2 connects to Node1 + node2Config.Staticnodes = []string{node1Address[0].String()} Debug("Creating Node2 with Store enabled") node2, err := StartWakuNode("Node2", &node2Config) @@ -562,21 +562,20 @@ func TestQueryFailWithIncorrectStaticNode(t *testing.T) { }() Debug("Sender Node1 is publishing a message") + queryTimestamp := proto.Int64(time.Now().UnixNano()) message := node1.CreateMessage() msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) require.NoError(t, err) require.NotEmpty(t, msgHash) Debug("Verifying that Node3 fails to retrieve stored messages due to incorrect static node") - //storeQueryRequest := &common.StoreQueryRequest{ - //TimeStart: proto.Int64(time.Now().UnixNano()), // Query messages published after this timestamp - // } - storedmsgs, err := node3.GetStoredMessages(node2, nil) - require.NoError(t, err, "Expected Node3's store query to fail due to incorrect static node") - //require.NoEmpty(t, storedMessages, "Expected no messages in store for Node3") - if (storedmsgs.Messages) != nil { - Debug("First Stored Message: Payload: %s", string((*storedmsgs.Messages)[0].WakuMessage.Payload)) + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, } + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Expected Node3's store query to fail due to incorrect static node") + require.Nil(t, (*storedmsgs.Messages)[0].WakuMessage, "Expected no messages in store for Node3") + Debug("Test successfully verified store query failure due to incorrect static node configuration") } @@ -916,3 +915,114 @@ func TestStoredMessagesWithVDifferentPayloads(t *testing.T) { Debug("Test finished successfully ") } + +func TestStoredMessagesWithDifferentContentTopics(t *testing.T) { + Debug("Starting test for different content topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, contentTopic := range CONTENT_TOPICS_DIFFERENT_SHARDS { + + Debug("Node1 is publishing message with content topic: %s", contentTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + message := node1.CreateMessage() + message.ContentTopic = contentTopic + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + ContentTopics: &[]string{contentTopic}, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, contentTopic, (*storedmsgs.Messages)[0].WakuMessage.ContentTopic, "Stored message content topic does not match expected") + Debug("Veified content topic %s ", (*storedmsgs.Messages)[0].WakuMessage.ContentTopic) + } + + Debug("Test finished successfully") +} + +func TestStoredMessagesWithDifferentPubsubTopics(t *testing.T) { + Debug("Starting test for different pubsub topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pubsubTopic := range PUBSUB_TOPICS_STORE { + + Debug("Node1 is publishing message on pubsub topic: %s", pubsubTopic) + node1.RelaySubscribe(pubsubTopic) + node2.RelaySubscribe(pubsubTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + var msg = node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(pubsubTopic, msg) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + PubsubTopic: pubsubTopic, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, pubsubTopic, (*storedmsgs.Messages)[0].PubsubTopic, "Stored message pubsub topic does not match expected") + } + + Debug("Test finished successfully") +} From 19c5fe02e9de1e8aa654679f585183956786800b Mon Sep 17 00:00:00 2001 From: aya Date: Tue, 25 Feb 2025 12:23:07 +0200 Subject: [PATCH 7/7] Add temp changes --- waku/nwaku.go | 20 ++++++++++---------- waku/nwaku_test.go | 38 +++++++++++++++++++------------------- waku/test_data.go | 40 +++++++++++++++++++++++++++++++++------- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index b77f990..a150ab3 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -325,6 +325,7 @@ import ( "unsafe" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" @@ -512,12 +513,11 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un node.OnEvent(eventStr) } } else { + errMsgField := zap.Skip() if len != 0 { - errMsg := C.GoStringN(msg, C.int(len)) - Error("globalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg) - } else { - Error("globalEventCallback retCode not ok, retCode: %v", callerRet) + errMsgField = zap.String("error", C.GoStringN(msg, C.int(len))) } + log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField) } } @@ -666,18 +666,18 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { for _, peerID := range peerIDs { id, err := peer.Decode(peerID) if err != nil { - Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err)) return nil, err } peers = append(peers, id) } - Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers)) + Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers))) return peers, nil } errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg) + Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg)) return nil, errors.New(errMsg) } @@ -1097,7 +1097,7 @@ func (n *WakuNode) Destroy() error { } errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to destroy %v: %v", n.nodeName, errMsg) + Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg)) return errors.New(errMsg) } @@ -1341,7 +1341,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } numPeers := len(peers) - Debug("Successfully fetched number of connected peers for %v, count: %v", n.nodeName, numPeers) + Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers)) return numPeers, nil } @@ -1363,7 +1363,7 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { for i := 0; i < 10; i++ { tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0")) if err != nil { - Warn("unable to resolve tcp addr: %v", err) + Warn("unable to resolve tcp addr: %v", zap.Error(err)) continue } tcpListener, err := net.ListenTCP("tcp", tcpAddr) diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 4c058d7..96450a7 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -37,7 +37,7 @@ func TestBasicWaku(t *testing.T) { // ctx := context.Background() - nwakuConfig := common.WakuConfig{ + nwakuConfig := WakuConfig{ Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", Relay: true, LogLevel: "DEBUG", @@ -190,7 +190,7 @@ func TestPeerExchange(t *testing.T) { tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) require.NoError(t, err) // start node that will be discovered by PeerExchange - discV5NodeWakuConfig := common.WakuConfig{ + discV5NodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: true, @@ -215,7 +215,7 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) // start node which serves as PeerExchange server - pxServerWakuConfig := common.WakuConfig{ + pxServerWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: true, @@ -264,7 +264,7 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) // start light node which uses PeerExchange to discover peers - pxClientWakuConfig := common.WakuConfig{ + pxClientWakuConfig := WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: false, @@ -324,7 +324,7 @@ func TestDnsDiscover(t *testing.T) { require.NoError(t, err) nameserver := "8.8.8.8" - nodeWakuConfig := common.WakuConfig{ + nodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", ClusterID: 16, @@ -353,7 +353,7 @@ func TestDial(t *testing.T) { require.NoError(t, err) // start node that will initiate the dial - dialerNodeWakuConfig := common.WakuConfig{ + dialerNodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -371,7 +371,7 @@ func TestDial(t *testing.T) { require.NoError(t, err) // start node that will receive the dial - receiverNodeWakuConfig := common.WakuConfig{ + receiverNodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -418,7 +418,7 @@ func TestRelay(t *testing.T) { require.NoError(t, err) // start node that will send the message - senderNodeWakuConfig := common.WakuConfig{ + senderNodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -436,7 +436,7 @@ func TestRelay(t *testing.T) { require.NoError(t, err) // start node that will receive the message - receiverNodeWakuConfig := common.WakuConfig{ + receiverNodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -502,7 +502,7 @@ func TestTopicHealth(t *testing.T) { require.NoError(t, err) // start node1 - wakuConfig1 := common.WakuConfig{ + wakuConfig1 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -520,7 +520,7 @@ func TestTopicHealth(t *testing.T) { require.NoError(t, err) // start node2 - wakuConfig2 := common.WakuConfig{ + wakuConfig2 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -575,7 +575,7 @@ func TestConnectionChange(t *testing.T) { require.NoError(t, err) // start node1 - wakuConfig1 := common.WakuConfig{ + wakuConfig1 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -593,7 +593,7 @@ func TestConnectionChange(t *testing.T) { require.NoError(t, err) // start node2 - wakuConfig2 := common.WakuConfig{ + wakuConfig2 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -662,7 +662,7 @@ func TestStore(t *testing.T) { require.NoError(t, err) // start node that will send the message - senderNodeWakuConfig := common.WakuConfig{ + senderNodeWakuConfig := WakuConfig{ Relay: true, Store: true, LogLevel: "DEBUG", @@ -682,7 +682,7 @@ func TestStore(t *testing.T) { require.NoError(t, err) // start node that will receive the message - receiverNodeWakuConfig := common.WakuConfig{ + receiverNodeWakuConfig := WakuConfig{ Relay: true, Store: true, LogLevel: "DEBUG", @@ -845,7 +845,7 @@ func TestParallelPings(t *testing.T) { require.NoError(t, err) // start node that will initiate the dial - dialerNodeWakuConfig := common.WakuConfig{ + dialerNodeWakuConfig := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -862,7 +862,7 @@ func TestParallelPings(t *testing.T) { tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) - receiverNodeWakuConfig1 := common.WakuConfig{ + receiverNodeWakuConfig1 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -883,7 +883,7 @@ func TestParallelPings(t *testing.T) { tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) - receiverNodeWakuConfig2 := common.WakuConfig{ + receiverNodeWakuConfig2 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, @@ -904,7 +904,7 @@ func TestParallelPings(t *testing.T) { tcpPort, udpPort, err = GetFreePortIfNeeded(0, 0) require.NoError(t, err) - receiverNodeWakuConfig3 := common.WakuConfig{ + receiverNodeWakuConfig3 := WakuConfig{ Relay: true, LogLevel: "DEBUG", Discv5Discovery: false, diff --git a/waku/test_data.go b/waku/test_data.go index 23d34c8..ae2f4a6 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -1,6 +1,7 @@ package waku import ( + "fmt" "time" "github.com/waku-org/waku-go-bindings/waku/common" @@ -9,6 +10,7 @@ import ( var DefaultWakuConfig WakuConfig var DefaultStoreQueryRequest common.StoreQueryRequest +var DEFAULT_CLUSTER_ID = 16 func init() { @@ -24,13 +26,13 @@ func init() { LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: false, - Store: false, - Filter: false, - Lightpush: false, - Discv5UdpPort: udpPort, - TcpPort: tcpPort, + //Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: udpPort, + TcpPort: tcpPort, } DefaultStoreQueryRequest = common.StoreQueryRequest{ @@ -95,3 +97,27 @@ var SAMPLE_INPUTS = []struct { {"A time string", "12:00:00"}, {"A mathematical equation", "E = mc^2"}, } + +var PUBSUB_TOPICS_STORE = []string{ + + fmt.Sprintf("/waku/2/rs/%d/0", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/1", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/2", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/3", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/4", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/5", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/6", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/7", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/8", DEFAULT_CLUSTER_ID), +} + +var CONTENT_TOPICS_DIFFERENT_SHARDS = []string{ + "/myapp/1/latest/proto", + "/waku/2/content/test.js", + "/app/22/sometopic/someencoding", + "/toychat/2/huilong/proto", + "/statusim/1/community/cbor", + "/app/27/sometopic/someencoding", + "/app/29/sometopic/someencoding", + "/app/20/sometopic/someencoding", +}