Hi all.
As a learning exercise I’m building a weather station using a Pico W + BME280 + SSD1306 that is readable from an Anvil page and locally on the OLED with the press of a button.
I’ve used threading to break out the push button functionality but doing this seems to lock out or stop the Anvil code (or at least its connection to Anvil). I suspect my dramas are due to (A) poor coding and (B) Anvil’s Pico build of MPython. I’ve tried locks and sleeps in the PB function which seem to work sometimes but not others (timing?).
So as a last resort I have the PB function reset the Pico. Crude I know but it does work. The Anvil page has to be refreshed but that’s ok. I don’t see the local PB being used much as it’s really just there for when I’m doing maintenance on the Stevenson Screen and Pico within.
Any constructive comments on the code would be appreciated.
from PiicoDev_BME280 import PiicoDev_BME280
from PiicoDev_Unified import sleep_ms
from machine import Pin
import ntptime
import time
import machine
import socket
import math
from PiicoDev_SSD1306 import * # The standard code for the OLED module.
import _thread # Required for the button function.
import anvil.pico
import uasyncio as a
UPLINK_KEY = "server_M2HQFAERLMD4F5HOS2IVUYRH-CVVXOGGKNNRG3HNY"
from do_connect import *
do_connect() # Connect to wifi and print the connection details to the shell.
print("------------------------------------------------------------") # Make the shell more readable.
from do_connect import signal_strength
display = create_PiicoDev_SSD1306() # Defines the OLED display.
Picoled = Pin("LED", Pin.OUT) # Define the Pico LED.
global button_pressed # Defines the variable for use in the thread.
button_pressed = machine.Pin(14, machine.Pin.IN) # Define the PB for the button _thread.
ntptime.settime() # Sync the RTC time using NTP server
rtc = machine.RTC() # Initialise
def heat_index():
# print("heat_index called") for testing.
R = humRH
T = (tempC * 1.8) + 32 # °F.
c1 = -42.379
c2 = 2.04901523
c3 = 10.14333127
c4 = -0.22475541
c5 = -6.83783 * (10**-3) # where ** is the exponential operator. This being 10 to the 3rd.
c6 = -5.481717 * (10**-2)
c7 = 1.22874 * (10**-3)
c8 = 8.5282 * (10**-4)
c9 = -1.99 * (10**-6)
HIF = c1 + (c2 * T) + (c3 * R) + (c4 * T * R) + (c5 * T**2) + (c6 * R**2) + (c7 * T**2 * R) + (c8 * T * R**2) + (c9 * T**2 * R**2)
HIC = (HIF - 32) / 1.8 # Convert to °C.
return HIC
def bme280_sensor():
# print("bme280_sensor called") for testing.
sensor = PiicoDev_BME280(iir=3) # initialise "sensor" as the atmos driver code with noise filtering (iir=3)
global tempC, presPa, humRH, preshPa # Make the variables available outside this function.
tempC, presPa, humRH = sensor.values() # Read all data from the sensor.
preshPa = presPa/100 # Convert Pa to hPa.
def battery_voltage():
# print("battery_voltage called") for testing.
voltage = machine.ADC(28) # Assigns the value of GP28 (pin 34) to the variable "Voltage". Jumpered to 3.3v pin 36.
voltbits=voltage.read_u16() # Read the value in bits.
battvolts = voltbits / 65535 * 3.3 * 2 # Convert the AO to volts. Note "x 2" due to the voltage divider.
return battvolts
def dewpoint(): # Calculate dewpoint
# print("dewpoint called") for testing.
bme280_sensor()
dp = tempC-((100-humRH)/5)
return dp
def pressure_msg(): # Forecast based on hPa reading.
#print("pressure_msg called") for testing.
bme280_sensor()
if preshPa > 1012:
pres_message = "Stable conditions, little rain, clear and calm weather"
if preshPa < 1013:
pres_message = "Unsettled conditions, possibe rain or storms"
return pres_message
def current_time(): # Get the current time from the Pico. rtc.datetime((yyyy, mm, ddd, hh, mm, s, s, s))
#print("current_time called") for testing.
current_time = rtc.datetime()
hours = (current_time[4] + 10) % 24 # Adds 10 to the hours for UTC+10. "% 24" sets the 24h format. Setting it as 12 means the +10 fails.
minutes = current_time[5]
seconds = current_time[6]
ctime = "{:02d}:{:02d}:{:02d}".format(hours, minutes, seconds) # Formats each to 2 digits.
return ctime
def oled (var_to_oled): # Display whatever variable called from the function oled_display.
#print("oled called") for testing.
display.fill(0) # Clears the display.
display.text(str(var_to_oled), 0,0, 1) # Display the variable at pixals X & Y on line 1.
display.show()
sleep_ms(1000)
display.fill(0) # Clear the display
display.show() # Must "show" the cleared display.
def oled_display(): # Function called at startup from below.
# Each line "oled(xxxx)" below calla the oled function to display the variable.
bme280_sensor() # Get the values from the atmos sensor for the calcs and ole calls below.
oled(current_time())
temp = str('%.1f' % tempC + " C")
oled(temp)
HIc = str("Feels like "'%.1f' % heat_index() + " C")
oled(HIc)
rh = str('%.0f' % humRH + " % RH")
oled(rh)
pres = str('%.0f' % preshPa + " hPa")
oled(pres)
dp = str('%.1f' % dewpoint() + " C Dewpoint")
bvolts = str('%.1f' % battery_voltage() + " volts")
oled(bvolts)
sigs = str('%.0f' % signal_strength() + " dBm")
oled(sigs)
def button_reader(): # Runs as a separate thread to allow the local PB to operate the OLED at any time.
print("button_reader called") # for testing.
button_pressed = machine.Pin(14, machine.Pin.IN) # Define the PB for the button _thread.
while True:
if button_pressed.value() == 1: # If the button is pressed, do...."
print("in button_reader, button_presses is True")
oled_display()
print("PB Pressed") # Print for testing.
sleep_ms(1000)
print("button_reader, machine reset") # for testing.
machine.reset() # Resets restarts the Pico to re-establish connection to Anvil. Anvil page must be refreshed.
@anvil.pico.callable(is_async=True) # This decorator makes the function callable by Anvil.
async def pico_fn(n):
#print("pico_fn called") for testing.
# Output will go to the Pico W serial port
print(f"Called local function with argument: {n}")
# Blink the LED and then double the argument and return it.
for i in range(10):
Picoled.toggle()
await a.sleep_ms(50)
return n * 2
@anvil.pico.callable(is_async=True) # This decorator makes the function callable by Anvil.
async def publish_data():
#print("publish_data called") for testing.
# Blink the LED to indicate an update.
Picoled.toggle()
sleep_ms(100)
Picoled.toggle()
sleep_ms(100)
Picoled.toggle()
sleep_ms(100)
Picoled.value(0)
sleep_ms(500)
# Get the sensor data, format decimal places and add units, then write to variables.
bme280_sensor()
ctime = current_time()
temp = str('%.1f' % tempC + " °C")
rh = str('%.0f' % humRH + " % RH")
pres = str('%.0f' % preshPa + " hPa")
dp = str('%.1f' % dewpoint() + " °C Dewpoint")
pmsg = pressure_msg()
bvolts = str('%.1f' % battery_voltage() + " volts")
sigs = str('%.0f' % signal_strength() + " dBm")
HIc = str('%.1f' % heat_index() + " °C")
return (ctime, temp, rh, pres, dp, pmsg, bvolts, sigs, HIc)
_thread.start_new_thread(button_reader, ()) # Start a separate thread for the PB to trigger the OLED.
Picoled.value(0) # Turn off the Pico LED.
oled_display() # Cycle through the variables on the OLED at startup.
print("anvil.pico.connect called") # for testing.
anvil.pico.connect(UPLINK_KEY) # Connect to Anvil via the Anvil connect function.