-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathirrigation_controller.py
More file actions
107 lines (82 loc) · 2.81 KB
/
irrigation_controller.py
File metadata and controls
107 lines (82 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import time
from machine import Pin
MANUAL_MAX_DURATION = 3600
zone_pins = {
"zone_1": Pin(16, Pin.OUT),
"zone_2": Pin(17, Pin.OUT),
"zone_3": Pin(18, Pin.OUT),
"zone_4": Pin(25, Pin.OUT),
"zone_5": Pin(26, Pin.OUT),
"zone_6": Pin(27, Pin.OUT),
"zone_7": Pin(32, Pin.OUT),
"zone_8": Pin(33, Pin.OUT),
}
VALID_ZONES = set(zone_pins.keys())
main_valve = Pin(19, Pin.OUT)
float_switch_1 = Pin(23, Pin.IN)
float_switch_2 = Pin(34, Pin.IN)
float_switch_3 = Pin(35, Pin.IN)
active_zone = None
manual_override = False
zone_end_time = None # time.time() of scheduled end for the active zone
active_program_id = None
# Auto program interrupted by a manual zone: {"id": int, "zone": str, "window_end": float}
paused_program = None
# Program explicitly paused by the user via command: {"id": int, "zone": str, "window_end": float}
user_paused_program = None
def _activate_pins(zone_name: str) -> None:
main_valve.on()
time.sleep_ms(200)
zone_pins[zone_name].on()
def _deactivate_pins() -> None:
for pin in zone_pins.values():
pin.off()
time.sleep_ms(200)
main_valve.off()
def activate_zone(
zone_name: str, duration: int, is_manual: bool = False, program_id: int = None
) -> None:
global active_zone, manual_override, zone_end_time, active_program_id
_activate_pins(zone_name)
active_zone = zone_name
manual_override = is_manual
zone_end_time = time.time() + duration
active_program_id = program_id
def deactivate_active_zone() -> dict:
"""Deactivates the current zone and returns its previous state."""
global active_zone, manual_override, zone_end_time, active_program_id
prev = {
"zone": active_zone,
"was_manual": manual_override,
"program_id": active_program_id,
}
_deactivate_pins()
active_zone = None
manual_override = False
zone_end_time = None
active_program_id = None
return prev
def deactivate_all_zones() -> None:
"""Emergency stop: deactivates all pins and resets all state."""
global active_zone, manual_override, zone_end_time, active_program_id, paused_program, user_paused_program
_deactivate_pins()
active_zone = None
manual_override = False
zone_end_time = None
active_program_id = None
paused_program = None
user_paused_program = None
def is_zone_timeout() -> bool:
if active_zone is None or zone_end_time is None:
return False
return time.time() >= zone_end_time
def get_remaining_seconds() -> int:
if active_zone is None or zone_end_time is None:
return 0
return max(0, int(zone_end_time - time.time()))
def get_float_switches() -> dict:
return {
"float_switch_1": float_switch_1.value(),
"float_switch_2": float_switch_2.value(),
"float_switch_3": float_switch_3.value(),
}