Skip to content
Open

RLN #166

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ FROM python:3.10.16-alpine3.21

ADD ./traffic.py /app/traffic.py

RUN pip install requests argparse aiohttp
RUN pip install aiohttp dnspython
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ containers:
--msg-size-kbytes=1 \
--delay-seconds=1 \
--pubsub-topic="/waku/2/rs/2/" \
--protocols relay
--protocols relay \
--service-names zerotesting-service \
--log-level=info
```

### How It Works
Expand All @@ -60,7 +62,8 @@ Logs the response status, elapsed time, and success/failure rates.
Uses `asyncio.create_task` to make sure messages are sent at a constant rate.

### Changelog

- `v1.0.4`:
- Add service selection to arguments
- `v1.0.3`:
- Added extra debugging to `check_dns_time`
- Changed `send_to_lightpush` to use v3 api endpoint.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
docker build -t <your-registry>/publisher:v1.0.3 .
docker push <your-registry>/publisher:v1.0.3
docker buildx build \
--platform linux/amd64,linux/arm64 \
-t docker.io/<your_registry>/publisher:v1.0.4 \
--push \
--no-cache \
.
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,67 @@
import socket
import time
import urllib.parse
from typing import Tuple, Dict
import dns.resolver
from typing import Tuple, List

logging.basicConfig(level=logging.INFO)

async def load_service_endpoints(service: str, namespace: str) -> List[Tuple[str, str, str]]:
fqdn = f"{service}.{namespace}.svc.cluster.local"
srv_query = f"_rest._tcp.{fqdn}"

async def check_dns_time(service: str) -> tuple[str, str, str]:
start_time = time.time()
try:
ip_address = socket.gethostbyname(service)
elapsed = (time.time() - start_time) * 1000
entire_hostname = socket.gethostbyaddr(ip_address)
answers = dns.resolver.resolve(srv_query, "SRV")
except Exception as e:
raise RuntimeError(f"Failed SRV query for service {service}: {e}")

endpoints = []

for ans in answers:
hostname = str(ans.target).rstrip(".")
try:
hostname = entire_hostname[0].split('.')[0]
node_shard = int(hostname.split('-')[1])
logging.info(f'{service} DNS Response took {elapsed} ms. Resolved to {hostname} with shard {node_shard}.')
return f'{ip_address}', hostname, f'{node_shard}'
ip = socket.gethostbyname(hostname)
except Exception as e:
logging.error(f"Failed. Service: `{service}`\nip_address: {ip_address}\nelapsed: {elapsed}\nentire_hostname: {entire_hostname}: `{e}`")
raise RuntimeError("Failed to split") from e
except (IndexError, ValueError) as e:
logging.error(f"Failed. Service: `{service}`: `{e}`")
raise RuntimeError("Failed check_dns_time") from e
raise RuntimeError(f"Failed resolving pod hostname {hostname}: {e}")

# hostname = rln-0-1.rln-nodes.rln-test.svc.cluster.local
short = hostname.split(".")[0] # rln-0-1
shard = short.split("-")[1] # 0

endpoints.append((ip, short, shard))

if not endpoints:
raise RuntimeError(f"No endpoints discovered for service {service}")

logging.info(f"Discovered {len(endpoints)} endpoints for service {service}: {endpoints}")
return endpoints


async def send_to_relay(args: argparse.Namespace) -> Tuple[str, Dict[str, str], Dict[str, str | int], str]:
node_address, node_hostname, node_shard = await check_dns_time('zerotesting-service')
topic = urllib.parse.quote(args.pubsub_topic + node_shard, safe='')
url = f'http://{node_address}:{args.port}/relay/v1/messages/{topic}'
async def send_to_relay(args: argparse.Namespace, endpoints):
node_ip, node_host, node_shard = random.choice(endpoints)

payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode('ascii').rstrip("=")
headers = {'Content-Type': 'application/json'}
body = {'payload': payload, 'contentTopic': args.content_topic, 'version': 1}
topic = urllib.parse.quote(args.pubsub_topic + node_shard, safe="")
url = f"http://{node_ip}:{args.port}/relay/v1/messages/{topic}"

return url, headers, body, node_hostname
payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode("ascii").rstrip("=")
headers = {"Content-Type": "application/json"}
body = {"payload": payload, "contentTopic": args.content_topic, "version": 1}

return url, headers, body, node_host

async def send_to_lightpush(args: argparse.Namespace) -> Tuple[str, Dict[str, str], Dict[str, dict[str, str | int]], str]:
node_address, node_hostname, shard = await check_dns_time('zerotesting-lightpush-client')
url = f'http://{node_address}:{args.port}/lightpush/v3/message'

payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode('ascii').rstrip("=")
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
body = {'pubsubTopic': args.pubsub_topic + shard,
'message': {'payload': payload, 'contentTopic': args.content_topic,
'version': 1}}
async def send_to_lightpush(args: argparse.Namespace, endpoints):
node_ip, node_host, shard = random.choice(endpoints)

return url, headers, body, node_hostname
url = f"http://{node_ip}:{args.port}/lightpush/v3/message"

payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode("ascii").rstrip("=")
headers = {"Content-Type": "application/json", "Accept": "application/json"}
body = {
"pubsubTopic": args.pubsub_topic + shard,
"message": {"payload": payload, "contentTopic": args.content_topic, "version": 1},
}

return url, headers, body, node_host


service_dispatcher = {
Expand All @@ -63,11 +77,13 @@ async def send_to_lightpush(args: argparse.Namespace) -> Tuple[str, Dict[str, st
}


async def send_waku_msg(args: argparse.Namespace, stats: Dict[str, int], i: int):
protocol = random.choice(args.protocols)

async def send_waku_msg(args, stats, i, endpoints):
index = random.choice(range(len(args.protocols)))
protocol = args.protocols[index]
protocol_function = service_dispatcher[protocol]

url, headers, body, node_hostname = await protocol_function(args)
url, headers, body, node_hostname = await protocol_function(args, endpoints)

logging.info(f"Message {i + 1} sent at {time.strftime('%H:%M:%S')}")
start_time = time.time()
Expand All @@ -85,7 +101,7 @@ async def send_waku_msg(args: argparse.Namespace, stats: Dict[str, int], i: int)
log_line += f"Url: {url}, headers: {headers}, body: {body},\n"
stats['total'] += 1

success_rate = (stats['success'] / stats['total']) * 100 if stats['total'] > 0 else 0
success_rate = stats['success'] / stats['total'] * 100
logging.info(f"{log_line}"
f"Time: [{elapsed_time:.4f} ms], "
f"Success: {stats['success']}, Failure: {stats['failure']}, "
Expand All @@ -94,26 +110,30 @@ async def send_waku_msg(args: argparse.Namespace, stats: Dict[str, int], i: int)
elapsed_time = (time.time() - start_time) * 1000
stats['failure'] += 1
stats['total'] += 1
success_rate = (stats['success'] / stats['total']) * 100 if stats['total'] > 0 else 0
success_rate = stats['success'] / stats['total'] * 100
logging.info(
f"Exception during message {i} sent to {node_hostname} : {str(e)}, Time: [{elapsed_time:.4f} ms], "
f"Url: {url}, headers: {headers}, body: {body}"
f"Success: {stats['success']}, Failure: {stats['failure']}, "
f"Success Rate: {success_rate:.2f}%")
f"Exception during message {i} sent to {node_hostname}: {str(e)}, "
f"Time: [{elapsed_time:.4f} ms], Url: {url}, headers: {headers}, body: {body}, "
f"Success Rate: {success_rate:.2f}%"
)


async def inject_message(background_tasks: set, args: argparse.Namespace, stats: Dict[str, int], i: int):
task = asyncio.create_task(send_waku_msg(args, stats, i))
async def inject_message(background_tasks, args, stats, i, endpoints):
task = asyncio.create_task(send_waku_msg(args, stats, i, endpoints))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)


async def main(args: argparse.Namespace):
logging.info("Loading service endpoints...")
endpoints = await load_service_endpoints(args.service_names[0])
logging.info(f"Using endpoints: {endpoints}")

background_tasks = set()
stats = {'success': 0, 'failure': 0, 'total': 0}

for i in range(args.messages):
await inject_message(background_tasks, args, stats, i)
await inject_message(background_tasks, args, stats, i, endpoints)
await asyncio.sleep(args.delay_seconds)

await asyncio.gather(*background_tasks)
Expand All @@ -122,7 +142,7 @@ async def main(args: argparse.Namespace):
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku message injector")
parser.add_argument('-pt', '--pubsub-topic', type=str, help='Pubsub topic',
default="/waku/2/rs/2/")
default='/waku/2/rs/2/')
parser.add_argument('-ct', '--content-topic', type=str, help='Content topic',
default="/my-app/1/dst/proto")
parser.add_argument('-s', '--msg-size-kbytes', type=int, help='Message size in kBytes',
Expand All @@ -133,12 +153,21 @@ def parse_args() -> argparse.Namespace:
default=10)
parser.add_argument('-ps', '--protocols', nargs='+', default=['relay'],
help='Protocols used inject messages')
parser.add_argument('-p', '--port', type=int, default=8645, help='Waku REST port')

parser.add_argument('-sn', '--service-names', help='K8s services used inject messages', nargs="+", default=['rln-nodes'], )
parser.add_argument('-p', '--port', help='Waku REST port', type=int, default=8645)
parser.add_argument('--log-level', default='info')
return parser.parse_args()


def configure_logging(level: str):
numeric = getattr(logging, level.upper(), logging.INFO)
logging.basicConfig(level=numeric, format="%(asctime)s [%(levelname)s] %(message)s")


if __name__ == "__main__":
args = parse_args()
configure_logging(args.log_level)
logging.info(f'{args}')
asyncio.run(main(args))

# Todo: extraer las ips de todos los servicios, porque pueden usarse varios.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
FROM ghcr.io/foundry-rs/foundry:latest

# We need root to install packages
USER 0

# Install deps:
# - curl/jq/git/gnupg/ca-certificates so we can talk to Anvil and parse JSON
# - Node 18 (via NodeSource) so pnpm works
# - pnpm globally
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends curl jq ca-certificates git gnupg bash \
&& curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
&& apt-get install -y --no-install-recommends nodejs \
&& npm install -g pnpm \
&& rm -rf /var/lib/apt/lists/*

# Prepare working dir for the RLN contracts
WORKDIR /rln

# Clone the repo WITH submodules so forge-std and other deps are present
RUN git clone --recursive \
--branch temp-with-imt-dep \
https://github.com/logos-messaging/waku-rlnv2-contract.git /rln


# Install JS deps once at build-time, so the runtime job doesn't need network/npm
RUN pnpm install

# Pre-build with forge so solc/artifacts are already warmed
RUN forge build

# Copy runtime bootstrap script (this script does the actual deploy to Anvil)
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh

# Run that script by default
ENTRYPOINT ["/entrypoint.sh"]
Loading