11import requests
22import time
3- import datetime
43import os
54import base64
65import urllib .parse
7- import requests
86import argparse
97import re
8+ import logging
9+
10+ logging .basicConfig (level = logging .INFO , format = '[%(asctime)s.%(msecs)03d] %(message)s' , datefmt = '%Y-%m-%d %H:%M:%S' )
1011
1112def send_waku_msg (node_address , kbytes , pubsub_topic , content_topic ):
1213 # TODO dirty trick .replace("=", "")
1314 base64_payload = (base64 .b64encode (os .urandom (kbytes * 1000 )).decode ('ascii' )).replace ("=" , "" )
14- print ("size message kBytes" , len (base64_payload ) * (3 / 4 )/ 1000 , "KBytes" )
15+ logging . info ("size message kBytes %.3f KBytes " , len (base64_payload ) * (3 / 4 )/ 1000 )
1516 body = {
1617 "payload" : base64_payload ,
1718 "contentTopic" : content_topic ,
1819 "version" : 1 , # You can adjust the version as needed
19- "timestamp" : int (time .time ())
20+ "timestamp" : int (time .time () * 1_000_000_000 ) # use nanoseconds to match nwaku node time unit
2021 }
2122
2223 encoded_pubsub_topic = urllib .parse .quote (pubsub_topic , safe = '' )
2324
2425 url = f"{ node_address } /relay/v1/messages/{ encoded_pubsub_topic } "
2526 headers = {'content-type' : 'application/json' }
2627
27- readable_time = datetime .datetime .utcnow ().strftime ('%Y-%m-%d %H:%M:%S.%f' )[:- 3 ]
28- print ('[%s] Waku REST API: %s PubSubTopic: %s, ContentTopic: %s' % (readable_time , url , pubsub_topic , content_topic ))
28+ logging .info ('Waku REST API: %s PubSubTopic: %s, ContentTopic: %s' , url , pubsub_topic , content_topic )
2929 s_time = time .time ()
3030
3131 response = None
32- readable_time = datetime .datetime .utcnow ().strftime ('%Y-%m-%d %H:%M:%S.%f' )[:- 3 ]
3332 try :
34- print ( '[%s] Sending request' % readable_time )
33+ logging . info ( ' Sending request' )
3534 response = requests .post (url , json = body , headers = headers )
3635 except Exception as e :
37- print ( f "Error sending request: { e } " )
36+ logging . error ( "Error sending request: %s" , e )
3837
3938 if (response != None ):
4039 elapsed_ms = (time .time () - s_time ) * 1000
41- readable_time = datetime .datetime .utcnow ().strftime ('%Y-%m-%d %H:%M:%S.%f' )[:- 3 ]
42- print ('[%s] Response from %s: status:%s content:%s [%.4f ms.]' % (readable_time , node_address , \
43- response .status_code , response .text , elapsed_ms ))
40+ logging .info ('Response from %s: status:%s content:%s [%.4f ms.]' , node_address ,
41+ response .status_code , response .text , elapsed_ms )
4442
4543parser = argparse .ArgumentParser (description = '' )
4644
@@ -53,13 +51,13 @@ def send_waku_msg(node_address, kbytes, pubsub_topic, content_topic):
5351parser .add_argument ('-c' , '--content-topic' , type = str , help = 'content topic' , default = "my-ctopic" )
5452parser .add_argument ('-p' , '--pubsub-topic' , type = str , help = 'pubsub topic' , default = "/waku/2/rs/66/0" )
5553parser .add_argument ('-s' , '--msg-size-kbytes' , type = int , help = 'message size in kBytes' , default = 10 )
56- parser .add_argument ('-d' , '--delay-seconds' , type = int , help = 'delay in second between messages' , required = 15 )
54+ parser .add_argument ('-d' , '--delay-seconds' , type = int , help = 'delay in seconds between messages' , default = 15 )
5755args = parser .parse_args ()
5856
59- print ( args )
57+ logging . info ( "Arguments: %s" , args )
6058
6159if args .single_node != None :
62- print ("Injecting traffic to single node REST API:" , args .single_node )
60+ logging . info ("Injecting traffic to single node REST API: %s " , args .single_node )
6361
6462# this simply converts from http://url_[1..5]:port to
6563# [http://url_1:port or from http://url-[1..5]:port to
@@ -69,27 +67,38 @@ def send_waku_msg(node_address, kbytes, pubsub_topic, content_topic):
6967 start , end = (int (x ) for x in re .search (r"\[(\d+)\.\.(\d+)\]" , args .multiple_nodes ).groups ())
7068
7169 if start is None or end is None :
72- print ("Could not parse range of multiple_nodes argument" )
70+ logging . error ("Could not parse range of multiple_nodes argument" )
7371 exit
7472
75- print ("Injecting traffic to multiple nodes REST APIs" )
73+ logging . info ("Injecting traffic to multiple nodes REST APIs" )
7674 for i in range (end , start - 1 , - 1 ):
7775 nodes .append (re .sub (r"\[\d+\.\.\d+\]" , str (i ), args .multiple_nodes ))
7876
7977for node in nodes :
80- print (node )
78+ logging .info ("Node: %s" , node )
79+
8180
8281while True :
8382 # calls are blocking
8483 # limited by the time it takes the REST API to reply
85-
84+ send_start_time = time . time ()
8685 if args .single_node != None :
8786 send_waku_msg (args .single_node , args .msg_size_kbytes , args .pubsub_topic , args .content_topic )
8887
8988 if args .multiple_nodes != None :
9089 for node in nodes :
9190 send_waku_msg (node , args .msg_size_kbytes , args .pubsub_topic , args .content_topic )
9291
93- readable_time = datetime .datetime .utcnow ().strftime ('%Y-%m-%d %H:%M:%S.%f' )[:- 3 ]
94- print ('[%s] sleeping: %s seconds' % (readable_time , args .delay_seconds ))
95- time .sleep (args .delay_seconds )
92+ send_elapsed = time .time () - send_start_time
93+ if args .multiple_nodes != None :
94+ logging .info ("Time taken to send to all nodes: %.4f seconds" , send_elapsed )
95+ else :
96+ logging .info ("Time taken to send to single node: %.4f seconds" , send_elapsed )
97+
98+ time_to_sleep = args .delay_seconds - send_elapsed
99+ if time_to_sleep > 0 :
100+ logging .info ("Sleeping %.4f seconds to maintain delay of %d seconds between rounds" , time_to_sleep , args .delay_seconds )
101+ else :
102+ logging .info ("No sleep needed to maintain delay of %d seconds between rounds" , args .delay_seconds )
103+
104+ time .sleep (max (0 , time_to_sleep ))
0 commit comments