-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathgcode_server.py
More file actions
181 lines (140 loc) · 5.16 KB
/
gcode_server.py
File metadata and controls
181 lines (140 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import pathlib
import socketserver
import time
import click
import threading
import socket
from flask import Flask, jsonify
gcode_buffer = []
buffer_lock = threading.Lock()
class GCode:
def __init__(self, gcode_file, lines=None):
self.gcode_file = gcode_file
if lines != None:
self.gcode_lines = lines
else:
self.gcode_lines = []
with open(gcode_file) as f:
self.gcode_lines = f.readlines()
self.gcode_lines.append("G28\n")
self.gcode_lines.append("END\n")
self.line_number = 0
def get_gcode(self, num_lines):
gcode_block = "".join(
self.gcode_lines[self.line_number : self.line_number + num_lines]
)
self.line_number += num_lines
return gcode_block
def complete(self):
return self.line_number >= len(self.gcode_lines)
def delete_file(self):
if self.gcode_file is not None:
self.gcode_file.unlink()
def reset(self):
self.line_number = 0
class EmptyGCode(GCode):
def __init__(self):
# Call init with END line only
super().__init__(None, ["END\n"])
class GCodeRequestHandler(socketserver.StreamRequestHandler):
start_line = 0
def handle(self):
global gcode_buffer, buffer_lock
print("Connected by", self.client_address)
with buffer_lock:
if len(gcode_buffer) == 0:
print("No GCode in buffer. Sending empty GCode.")
gcode = EmptyGCode()
else:
gcode = gcode_buffer.pop(0)
try:
while not gcode.complete():
self.request.settimeout(5)
print("Waiting for data...")
data = self.request.recv(1024).strip().decode()
if data.startswith("GET /gcode?lines="):
try:
num_lines_requested = int(data.split("=")[1].split(" ")[0])
except (ValueError, IndexError):
self.request.sendall(b"Invalid request format.\n")
continue
# Send the G-code lines
gcode_block = gcode.get_gcode(num_lines_requested)
print("Sending response")
self.request.sendall(gcode_block.encode())
print("Response sent")
# Delete gcode file
gcode.delete_file()
except socket.timeout:
print("Connection timed out. Will retry this gcode next time")
# Put gcode back in the buffer
gcode.reset()
with buffer_lock:
gcode_buffer.insert(0, gcode)
except (ConnectionResetError, BrokenPipeError):
print("Connection closed by client. Will retry this gcode next time")
# Put gcode back in the buffer
gcode.reset()
with buffer_lock:
gcode_buffer.insert(0, gcode)
# Close the socket and connection
print("G-code transmission ended. Closing connection.")
self.request.close()
def get_new_gcode(scan_directory: pathlib.Path):
"""Thread for reading new GCode files"""
global gcode_buffer, buffer_lock
print("Scanning directory:", scan_directory)
processed_files = set()
while True:
# Check for new GCode files
new_gcode_files = list(scan_directory.glob("*.gcode"))
# Remove already processed files from the list
new_gcode_files = [
gcode_file
for gcode_file in new_gcode_files
if gcode_file not in processed_files
]
# Process new GCode files
for gcode_file in new_gcode_files:
print("Reading new GCode file:", gcode_file)
with buffer_lock:
gcode_buffer.append(GCode(gcode_file))
processed_files.add(gcode_file)
# Sleep for a while
time.sleep(2)
### Code for rest endpoint to allow etch a sketch to check if gcode is available
app = Flask(__name__)
@app.route("/gcode/available")
def gcode_available():
global gcode_buffer, buffer_lock
with buffer_lock:
# Return 200 if there is GCode in the buffer, else 204
if len(gcode_buffer) > 0:
return "", 200
else:
print("Returng 204")
return "", 204
# Add click argument for gcode directory
@click.command()
@click.option(
"--gcode_dir",
default="gcode_outputs",
help="Directory containing GCode files",
type=click.Path(exists=True),
)
def main(gcode_dir: pathlib.Path):
gcode_dir = pathlib.Path(gcode_dir)
# Start the GCode reading thread
gcode_thread = threading.Thread(target=get_new_gcode, args=(gcode_dir,))
gcode_thread.start()
# Start the Flask app to host the rest endpoint
flask_thread = threading.Thread(
target=app.run, kwargs={"host": "0.0.0.0", "port": 5001}
)
flask_thread.start()
host, port = "0.0.0.0", 5000 # Your host and port here
with socketserver.TCPServer((host, port), GCodeRequestHandler) as server:
print("Server listening...")
server.serve_forever()
if __name__ == "__main__":
main()