Skip to content

Commit 7b27d9c

Browse files
committed
Add file ops e2e test
1 parent 2ad6d74 commit 7b27d9c

File tree

2 files changed

+159
-1
lines changed

2 files changed

+159
-1
lines changed

mqterm/terminal.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async def stream_job_output(self, job):
104104
self.logger.debug(f"Streaming {bytes_read} bytes")
105105
await self.mqtt_client.publish(
106106
self.out_topic,
107-
self.out_view[:bytes_read].tobytes(),
107+
self.out_view[:bytes_read],
108108
qos=1,
109109
properties={
110110
self.PROP_CORR: job.client_id.encode("utf-8"),

tests/e2e/e2e_file_ops.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
#!/usr/bin/env micropython
2+
3+
"""End-to-end tests for copying files to and from the device."""
4+
5+
import asyncio
6+
import logging
7+
import os
8+
import sys
9+
from io import BytesIO
10+
11+
from amqc.client import MQTTClient, config
12+
13+
from mqterm.terminal import MqttTerminal
14+
15+
# Set up logging; pass LOG_LEVEL=DEBUG if needed for local testing
16+
logger = logging.getLogger()
17+
logger.setLevel(getattr(logging, os.getenv("LOG_LEVEL", "WARNING").upper()))
18+
formatter = logging.Formatter(
19+
"%(asctime)s.%(msecs)d - %(levelname)s - %(name)s - %(message)s"
20+
)
21+
handler = logging.StreamHandler(sys.stdout)
22+
handler.setFormatter(formatter)
23+
logger.handlers = []
24+
logger.addHandler(handler)
25+
device_logger = logging.getLogger("device")
26+
control_logger = logging.getLogger("control")
27+
28+
29+
# MQTT Client config
30+
config["server"] = "localhost"
31+
config["queue_len"] = 1 # use event queue
32+
device_config = config.copy()
33+
control_config = config.copy()
34+
device_config["client_id"] = "device"
35+
control_config["client_id"] = "server"
36+
37+
# Set up MQTT clients
38+
device_client = MQTTClient(device_config, logger=device_logger)
39+
control_client = MQTTClient(control_config, logger=control_logger)
40+
41+
# Set up the terminal
42+
term = MqttTerminal(device_client, topic_prefix="/test")
43+
44+
45+
def create_props(seq: int, client_id: str) -> dict:
46+
"""Create MQTTv5 properties with a seq number and client ID."""
47+
return {
48+
MqttTerminal.PROP_CORR: client_id.encode("utf-8"),
49+
MqttTerminal.PROP_USER: {"seq": str(seq)},
50+
}
51+
52+
53+
async def send_file(buffer: BytesIO):
54+
"""Send a file to the terminal."""
55+
# Send the first message that will create the job
56+
seq = 0
57+
props = create_props(seq, "tty0")
58+
await control_client.publish(
59+
"/test/tty/in", "cp test.txt test.txt".encode("utf-8"), properties=props
60+
)
61+
62+
# Send the file in 4-byte chunks; close when done
63+
fp = BytesIO(b"Hello world!")
64+
while True:
65+
await asyncio.sleep(0.5)
66+
chunk = fp.read(4)
67+
if chunk:
68+
seq += 1
69+
else:
70+
seq = -1
71+
props = create_props(seq, "tty0")
72+
logger.debug(f"Sending chunk {seq} of size {len(chunk)}: {chunk!r}")
73+
await control_client.publish("/test/tty/in", chunk, properties=props)
74+
if seq == -1:
75+
break
76+
77+
# Wait until the received buffer gets populated with the response
78+
await asyncio.sleep(1)
79+
80+
# Return the bytes received and empty the output buffer
81+
buffer.seek(0)
82+
output = buffer.read()
83+
logger.debug(f"Buffer contents: {output!r}")
84+
buffer.flush()
85+
buffer.seek(0)
86+
return output
87+
88+
89+
async def get_file(buffer: BytesIO):
90+
"""Send a file to the terminal and read it back."""
91+
# Send the request for the file
92+
seq = 0
93+
props = create_props(seq, "tty0")
94+
await control_client.publish(
95+
"/test/tty/in", "cat test.txt".encode(), properties=props
96+
)
97+
98+
# Wait until the received buffer gets populated with the response
99+
await asyncio.sleep(1)
100+
101+
# Return the bytes received and empty the output buffer
102+
buffer.seek(0)
103+
output = buffer.read()
104+
buffer.flush()
105+
return output
106+
107+
108+
# Handler for device messages that passes them to the terminal
109+
async def device_handler():
110+
async for topic, payload, _retained, properties in device_client.queue:
111+
device_logger.debug(f"Device received {len(payload)} bytes on topic '{topic}'")
112+
await term.handle_msg(topic, payload, properties)
113+
114+
115+
# Handler for control messages that logs and stores them
116+
async def control_handler(buffer):
117+
async for topic, payload, _retained, properties in control_client.queue:
118+
if topic == "/test/tty/err":
119+
logger.error(f"Control received error: {payload.decode('utf-8')}")
120+
else:
121+
buffer.write(payload) # Don't decode yet
122+
logger.debug(f"Control received {len(payload)} bytes on topic '{topic}'")
123+
124+
125+
# Main test function
126+
async def main():
127+
# Connect all clients and the terminal
128+
await control_client.connect(True)
129+
await control_client.subscribe("/test/tty/out")
130+
await control_client.subscribe("/test/tty/err")
131+
await device_client.connect(True)
132+
await term.connect()
133+
134+
# Run handlers in the background and test task in the foreground
135+
buffer = BytesIO() # buffer for received bytes
136+
asyncio.create_task(control_handler(buffer))
137+
asyncio.create_task(device_handler())
138+
139+
bytes_sent = await send_file(buffer)
140+
logger.debug(f"Sent {bytes_sent.decode()} bytes to the device")
141+
bytes_received = await get_file(buffer)
142+
logger.debug(f"Received {len(bytes_received)} bytes from the device")
143+
144+
# Disconnect and clean up
145+
await term.disconnect()
146+
await device_client.disconnect()
147+
await control_client.disconnect()
148+
149+
# Read out the received file
150+
received_str = bytes_received.decode("utf-8")
151+
assert (
152+
received_str == "Hello world!"
153+
), f"Expected 'Hello world!', got '{received_str}'"
154+
155+
156+
if __name__ == "__main__":
157+
asyncio.run(main())
158+
print("\033[1m\tOK\033[0m")

0 commit comments

Comments
 (0)