-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
153 lines (116 loc) · 3.83 KB
/
app.py
File metadata and controls
153 lines (116 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from flask import Flask
from flask_graphql import GraphQLView
from flask_cors import CORS
from waitress import serve
import graphene
import threading
import time
import json
import JoyStick
import Client
# import Client
joystick_instance = JoyStick.JoyStick()
client_instance = Client.UDPClient("192.168.1.101", 4567)
class JoystickField(graphene.ObjectType):
available = graphene.Boolean()
axes = graphene.List(graphene.Float)
buttons = graphene.List(graphene.Int)
hats = graphene.List(graphene.Int)
class AtitudeField(graphene.ObjectType):
available = graphene.Boolean()
pitch = graphene.Float()
roll = graphene.Float()
yaw = graphene.Float()
class JointField(graphene.ObjectType):
available = graphene.Boolean()
left_thruster_joint = graphene.Float()
right_thruster_joint = graphene.Float()
end_joint_2 = graphene.Float()
end_joint_1 = graphene.Float()
class Query(graphene.ObjectType):
joystick = graphene.Field(JoystickField)
altitude = graphene.Field(AtitudeField)
joint = graphene.Field(JointField)
depth = graphene.Float()
temperature = graphene.Float()
def resolve_joystick(self, info):
del info
global joystick_instance
if not joystick_instance.available:
joystick_instance.init()
return {"available": False, "axes": {}, "buttons": {}, "hats": {}}
data = joystick_instance.get()
return data
def resolve_altitude(self, info):
del info
global client_instance
data = client_instance.get_data(Client.Data.ATTITUDE)
available = data[1]
if not available:
return {"available": False, "pitch": 0, "roll": 0, "yaw": 0}
data = data[0]
return {"available": True, "pitch": data[1], "roll": data[0], "yaw": data[2]}
def resolve_joint(self, info):
del info
global client_instance
data = client_instance.get_data(Client.Data.SERVO)
available = data[1]
if not available:
return {
"available": False,
"left_thruster_joint": 0,
"right_thruster_joint": 0,
"end_joint_2": 0,
"end_joint_1": 0,
}
data = data[0]
return {
"available": True,
"left_thruster_joint": data[0],
"right_thruster_joint": data[1],
"end_joint_2": data[2],
"end_joint_1": data[3],
}
def resolve_depth(self, info):
del info
global client_instance
data = client_instance.get_data(Client.Data.DEPTH)
available = data[1]
if not available:
return 0
return data[0][0]
def resolve_temperature(self, info):
del info
global client_instance
data = client_instance.get_data(Client.Data.TEMPERATURE)
available = data[1]
if not available:
return 0
return data[0][0]
# 定义一个 GraphQL Schema
schema = graphene.Schema(query=Query)
app = Flask(__name__)
CORS(app)
# 添加 GraphQL 视图,指定 schema
app.add_url_rule(
"/graphql", view_func=GraphQLView.as_view("graphql", schema=schema, graphiql=True)
)
# def serialReceive():
# global client_instance
# while True:
# client_instance.receive_data()
# time.sleep(0.01)
if __name__ == "__main__":
app_thread = threading.Thread(
target=serve, kwargs={"app": app, "host": "0.0.0.0", "port": 5000}, daemon=True
)
app_thread.start()
def on_recv(self, data, addr):
print(f"Received from {addr}: {data}")
type = json.loads(data.decode("utf-8")).get("type", "str")
self.data[type] = data
client_instance.start(on_recv)
while True:
time.sleep(0.01)
joystick_instance.update()
joystick_data = joystick_instance.get()