-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathmodbus_system_monitor.py
81 lines (64 loc) · 2.69 KB
/
modbus_system_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python
# -*- coding: utf_8 -*-
"""
Modbus TestKit: Implementation of Modbus protocol in python
(C)2009 - Luc Jean - [email protected]
(C)2009 - Apidev - http://www.apidev.fr
This is distributed under GNU LGPL license, see license.txt
This example shows how to create a modbus server in charge of monitoring
cpu consumption the machine
"""
from modbus_tk.simulator import *
from modbus_tk.simulator_rpc_client import SimulatorRpcClient
from modbus_tk.utils import WorkerThread
from modbus_tk.defines import *
from modbus_tk.modbus_tcp import TcpServer
import time
class SystemDataCollector:
"""The class in charge of getting the CPU load"""
def __init__(self, refresh_rate_in_sec):
"""Constructor"""
self._simu = SimulatorRpcClient()
self._max_count = refresh_rate_in_sec * 10
self._count = self._max_count-1
def collect(self):
"""get the CPU load thanks to WMI"""
try:
self._count += 1
if self._count >= self._max_count:
self._count = 0
#WMI get the load percentage of the machine
from win32com.client import GetObject
wmi = GetObject('winmgmts:')
cpu = wmi.InstancesOf('Win32_Processor')
for (_cpu, i) in zip(cpu, xrange(10)):
value = _cpu.Properties_('LoadPercentage').Value
cpu_usage = int(str(value)) if value else 0
#execute a RPC command for changing the value
self._simu.set_values(1, "Cpu", i, (cpu_usage, ))
except Exception, excpt:
LOGGER.debug("SystemDataCollector error: %s", str(excpt))
time.sleep(0.1)
if __name__ == "__main__":
#create the object for getting CPU data
data_collector = SystemDataCollector(5)
#create the thread in charge of calling the data collector
system_monitor = WorkerThread(data_collector.collect)
#create the modbus TCP simulator and one slave
#and one block of analog inputs
simu = Simulator(TcpServer())
slave = simu.server.add_slave(1)
slave.add_block("Cpu", ANALOG_INPUTS, 0, 10)
try:
LOGGER.info("'quit' for closing the server")
#start the data collect
system_monitor.start()
#start the simulator! will block until quit command is received
simu.start()
except Exception, excpt:
print excpt
finally:
#close the simulator
simu.close()
#stop the data collect
system_monitor.stop()