Hi @Eric289745
That sounds like a really interesting project!
I’ve thrown together some code below that should at least get you started with what you’re looking to do, it will get the time via NTP, and automatically turn the valves on at 6am, with them turning off at 6:10am.
I’ve added in a physical button on the pico for a manual override, as well as a master override in the web interface and also individual overrides.
You can remove any or add onto the code to meet your needs.
import network
import socket
from machine import Pin, Timer
import utime
import ntptime
import ujson
import os
# === Config ===
SOLENOID_PINS = [10, 11, 12, 13, 14, 15, 16]
BUTTON_PIN = 9
OVERRIDE_TIMEOUT = 10 * 60 # 10 minutes
SCHEDULE_HOUR = 6
SCHEDULE_DURATION = 10 * 60 # 10 minutes
SSID = 'YourSSID'
PASSWORD = 'YourPassword'
# === State ===
solenoids = [Pin(pin, Pin.OUT) for pin in SOLENOID_PINS]
solenoid_states = [False] * len(solenoids)
manual_override = [False] * len(solenoids)
override_start_time = [0] * len(solenoids)
# === Physical Button State ===
physical_override = False
physical_override_start = 0
# === Wi-Fi & NTP ===
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
utime.sleep(1)
print('Connected to Wi-Fi:', wlan.ifconfig()[0])
return wlan.ifconfig()[0]
def sync_time():
try:
ntptime.settime()
print("Time synced with NTP.")
except:
print("NTP sync failed.")
# === Solenoid Control ===
def set_solenoid(index, state):
solenoids[index].value(state)
solenoid_states[index] = state
def update_solenoid_state():
now = utime.time()
t = utime.localtime()
seconds_now = t[3] * 3600 + t[4] * 60 + t[5]
schedule_start = SCHEDULE_HOUR * 3600
schedule_end = schedule_start + SCHEDULE_DURATION
global physical_override
# Expire physical override
if physical_override and (now - physical_override_start > OVERRIDE_TIMEOUT):
physical_override = False
for i in range(len(solenoids)):
manual_override[i] = False
for i in range(len(solenoids)):
# Expire per-solenoid manual override
if manual_override[i] and (now - override_start_time[i] > OVERRIDE_TIMEOUT):
manual_override[i] = False
# Skip if manually overridden
if manual_override[i]:
continue
in_schedule = schedule_start <= seconds_now < schedule_end
set_solenoid(i, in_schedule)
# === Button Interrupt ===
button = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_DOWN)
def button_handler(pin):
global physical_override, physical_override_start
physical_override = not physical_override
physical_override_start = utime.time()
new_state = not solenoid_states[0]
for i in range(len(solenoids)):
set_solenoid(i, new_state)
manual_override[i] = True
override_start_time[i] = utime.time()
save_overrides_to_flash()
button.irq(trigger=Pin.IRQ_RISING, handler=button_handler)
# === Timer ===
timer = Timer()
timer.init(period=10000, mode=Timer.PERIODIC, callback=lambda t: update_solenoid_state())
# === Save and Load Overrides ===
def save_overrides_to_flash():
try:
with open("overrides.json", "w") as f:
data = {
"manual_override": manual_override,
"override_start_time": override_start_time
}
ujson.dump(data, f)
print("Override states saved.")
except Exception as e:
print("Failed to save override state:", e)
def load_overrides_from_flash():
global manual_override, override_start_time
try:
if "overrides.json" in os.listdir():
with open("overrides.json", "r") as f:
data = ujson.load(f)
now = utime.time()
for i in range(len(solenoids)):
if data["manual_override"][i]:
elapsed = now - data["override_start_time"][i]
if elapsed < OVERRIDE_TIMEOUT:
manual_override[i] = True
override_start_time[i] = data["override_start_time"][i]
set_solenoid(i, True)
else:
manual_override[i] = False
set_solenoid(i, False)
print("Override states loaded from flash.")
except Exception as e:
print("Failed to load override state:", e)
# === Web Interface ===
def get_override_remaining(i):
if manual_override[i]:
remaining = OVERRIDE_TIMEOUT - (utime.time() - override_start_time[i])
return max(0, int(remaining))
return 0
def web_page():
solenoid_html = ""
for i, state in enumerate(solenoid_states):
mode = "Manual Override" if manual_override[i] else "Scheduled"
remaining = get_override_remaining(i)
override_info = f" | {remaining}s left" if manual_override[i] else ""
solenoid_html += f"""
<p>Solenoid {i+1}: <strong>{'ON' if state else 'OFF'}</strong> | Mode: <strong>{mode}</strong>{override_info}</p>
<form action="/toggle{i}" method="get">
<button type="submit">Toggle Solenoid {i+1}</button>
</form>
"""
html = f"""<html>
<head>
<title>Solenoid Control</title>
<meta http-equiv="refresh" content="10">
<style>button {{ margin-bottom: 10px; }}</style>
</head>
<body>
<h1>7-Zone Solenoid Controller</h1>
<form action="/master" method="get">
<button style="background-color:lightcoral;" type="submit"><strong>MASTER TOGGLE ALL</strong></button>
</form>
<hr>
{solenoid_html}
</body></html>"""
return html
def serve():
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print('Web server running on port 80')
while True:
cl, addr = s.accept()
request = cl.recv(1024).decode()
# Handle master override toggle
if "/master" in request:
global physical_override, physical_override_start
physical_override = not physical_override
physical_override_start = utime.time()
new_state = not solenoid_states[0]
for i in range(len(solenoids)):
set_solenoid(i, new_state)
manual_override[i] = True
override_start_time[i] = utime.time()
save_overrides_to_flash()
# Handle individual solenoid toggle
for i in range(len(solenoids)):
if f"/toggle{i}" in request:
manual_override[i] = not manual_override[i]
override_start_time[i] = utime.time()
set_solenoid(i, not solenoid_states[i])
save_overrides_to_flash()
response = web_page()
cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
cl.send(response)
cl.close()
# === Start System ===
ip = connect_wifi()
sync_time()
load_overrides_from_flash()
print(f"Access the web interface at http://{ip}")
serve()