-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxiaomi_yi.py
133 lines (102 loc) · 3.61 KB
/
xiaomi_yi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/python3
import json
import socket
from time import sleep, perf_counter
REST = 3
class NotConnected(Exception):
def __init__(self, error):
self.error = error
class XiaomiYi:
"""
# Make XiaomiYi object with default values.
# You can override these like :
# camera = XiaomiYi(ip="192.168.42.1", port=7878, timeout=5)
camera = XiaomiYi()
# Make connection to the camera.
camera.connect()
# Take single photo.
camera.take_photo()
# You can start recording.
camera.start_video()
sleep(5)
# And stop it manually.
camera.stop_video()
# Or record for a desired time (in seconds).
camera.start_video(10)
# Take photo every 5 seconds, for 30 sec.
# If second parameter is ommited, take photos forever.
camera.seq_photos(5, 30)
# Enable streaming for X seconds, or forever if ommited.
# Connect to stream on http://192.168.42.1/live with VLC or something.
# camera.stream(30)
# Send custom commands.
# You can find list of some common in commands.txt
cmd = { "msg_id": 2,
"token": camera.token(),
"type": "video_quality",
"param": "S.Fine"
}
camera.send(cmd)
# Close connection.
camera.close()
"""
def __init__(self, ip="192.168.1.1", port=7878, timeout=5):
self._ip = ip
self._port = port
self._timeout = timeout
self.__token = None
self.__control = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __repr__(self):
return "XiaomiYi('{}', {}, {})".format(self._ip, self._port, self._timeout)
def send(self, data, connect=False):
"""
There needs to be little delay after every command,
except while getting token,
otherwise Yi will ignore following commands.
"""
if self.__token or connect:
self.__control.send(bytes(json.dumps(data), 'UTF-8'))
if not connect:
sleep(REST)
else:
raise NotConnected("Make connection with object.connect() first.")
def token(self):
# print("Your token is: ", __token)
return self.__token
def connect(self):
self.__control.settimeout(self._timeout)
self.__control.connect((self._ip, self._port))
self.send({"msg_id": 257, "token": 0}, True)
data = self.__control.recv(512).decode("utf-8")
if not "rval" in data:
data = self.__control.recv(512).decode("utf-8")
self.__token = json.loads(data)["param"]
def take_photo(self):
self.send({"msg_id": 769, "token": self.__token})
def start_video(self, duration=False):
self.send({"msg_id": 513, "token": self.__token})
if duration:
sleep(duration)
self.send({"msg_id": 514, "token": self.__token})
def stop_video(self):
self.send({"msg_id": 514, "token": self.__token})
def seq_photos(self, every, until=False):
# "every" needs to be at least "REST" seconds.
if every < REST:
every = REST
begin = perf_counter()
while True:
self.take_photo()
if until and (until < perf_counter() - begin):
break
sleep(every - REST)
def stream(self, until=False):
begin = perf_counter()
self.send({"msg_id": 259, "token": self.__token, "param": "none_force"})
while True:
if until and (until < perf_counter() - begin):
break
sleep(REST)
return json(self.__control.recv(512).decode("utf-8"))
def close(self):
self.__control.close()