OK, some photos and code.
- main1.py (copy to main.py when you want it to boot)
- WAP.py Wireless Access Point
- audio.py Makes beeping noise when going up in lift (A joy to a pilot!)
- sensors.py Sets up and read the pressure sensors Barometer (Altitude and Vario), and ASI
- gpsuart.py Sets up uart and manages the GPS data
- lxnav.py LXNAV protocol sentence generator, returns the assembled string.
If I write a class or even just a bunch of code in a file, I try and leave an āExample of usageā at the end of the file, this can be used to demo/modify/develop the code in the file above it. But donāt forget to comment it out again.
(Also I find it good to help refresh my fading memory!)
There is also a lot of hardcoded setup in this these would be best moved to passed args in the startup of main1.py. But I have left it this way for clarity.
main1.py
## XCsoar WiFi Data Dongle
## main file.
## This starts up all the parts
## and then does the final part of sending data to XCSoar
## Written specifically for the RP PICO-W
## could run on an ESP32 or similar,
## but the audio tone generator will need to be ported to an ISR,
## as this uses the PICO PIO module
## sensors run in a _thread on core1 (PICO specific as it has 2 cores)
## but _thread works on the ESP32 as well
## If changing hardware arrangement,
## remember to update sensors and gps to any new pins used
## Code has been written to be as simple as possible
## Low level imports
import machine
from machine import Pin, Timer
import time
import sys
## Application specific imports
import sensors
import lxnav
import gpsuart
import WAP
import audio
## some links that may be useful
#https://forums.raspberrypi.com/viewtopic.php?t=310062
#https://hackspace.raspberrypi.com/articles/raspberry-pi-picos-pio-for-mere-mortals-part-3-sound
##############################################
## for development, this will try and #######
## end the process neatly #######
button = Pin(0, Pin.IN, Pin.PULL_UP)
global loop
loop = True
timer = Timer(-1)
def checkButton(t):
global loop
if(button.value() == 0):
loop = False
timer.init(period=100, mode=Timer.PERIODIC, callback=checkButton)
##############################################
# Should exit a booting "main.py", on power up, hold the "button"
time.sleep(0.5)
if(loop == False):
print("Exit")
sys.exit()
## define the parameters that setup the network and hardware
# Wireless Access Point WAP
ssid = "W-ap"
password = "123456789"
## startup the various parts
print("Starting Systems")
aud = audio.audio()
aud.doStartBeep()
ap = WAP.wap(ssid, password)
gps = gpsuart.gpsUart()
sensors.startSensors()
lx = lxnav.LXNAV() ## the protocol format in which to send the data to XCsoar
print("All started")
message_to_xcsoar_udp = ''
time.sleep(2.0)
aud.doStartTune()
a = 0.0
v = 0.0
asi= 0.0
while(loop):
gps.readUart()
if(gps.messages() > 0):
while(gps.messages() > 0):
message_to_xcsoar_udp = gps.getNMEA() # get one awaiting NMEA sentences
ap.udpSend(message_to_xcsoar_udp) # and send it
# try:
# print(message_to_xcsoar_udp.decode('UTF-8'))
# except Exception as e:
# print(e)
aud.do(bario.readVARIO())
a = sensors.readALTI()
v = sensors.readVARIO()
asi =sensors.readASI() ## make sure there is not data for ASI below just''
message_to_xcsoar_udp = lx.get('{:.2f}'.format(asi), '{:.2f}'.format(a), '{:.2f}'.format(v))
ap.udpSend(message_to_xcsoar_udp)
print(message_to_xcsoar_udp.decode('UTF-8'))
aud.do(v)
time.sleep(0.1)
## if button is low loop exits and preforms a shutdown
print("shutting down")
aud.doEndTune()
print("1")
gps.end()
print("2")
ap.end()
print("3")
aud.end()
print("4")
rp2.PIO(0).remove_program()
print("5")
timer.deinit()
print("6")
sensors.endCore1() # this is not returning ???
print("shutdown")
sys.exit()
WAP.py
import network
import socket
import machine
import time
######################################################
class wap:
def __init__(self, ssid, password):
print("Starting WAP")
self.ssid = ssid
self.password = password
self.ap = network.WLAN(network.AP_IF)
self.ap.config(essid=ssid, password=password)
self.ap.active(True)
# print(self.ap.config("mac"))
# print(self.ap.config("essid"))
# print(self.ap.ifconfig())
conf = self.ap.ifconfig()
UDP_IP = str(conf[0])
print(UDP_IP)
UDP_IP_list = UDP_IP.split('.')[0:3] + ['255']
self.UDP_IP = '.'.join(UDP_IP_list)
# self.UDP_IP = '192.168.4.16'
print(self.UDP_IP)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def udpSend(self, msg):
XCSOAR_UDP_PORT = 4353
self.sock.sendto(msg, (self.UDP_IP, XCSOAR_UDP_PORT))
self.sock.sendto(msg, (self.UDP_IP, 2000))
def end(self):
self.ap.active(False)
def rex(self, p):
UDP_PORT = p
self.sock.settimeout(0.10)
try:
data, address = self.sock.recvfrom(1024)
print(address, data.decode())
except:
pass
###########################################################
#
# ap = wap("W-ap", "123456789")
# x =100
#
# while(x):
# # ap.udpSend(MESSAGE)
# ap.rex(4353)
# print('.')
# time.sleep(1.0)
# x -= 1
###########################################################
###########################################################
###########################################################
###########################################################
class station:
def __init__(self, ssid, password):
print("Starting joining wifi WAP")
self.ssid = ssid
self.password = password
self.wlan = network.WLAN(network.STA_IF)
# self.wlan.config(essid=self.ssid, password=self.password)
self.wlan.active(True)
res =self.wlan.scan()
for each in res:
print(each)
# self.wlan.connect(self.ssid, self.password)
# conf = self.wlan.ifconfig()
# UDP_IP = str(conf[0])
# UDP_IP_list = UDP_IP.split('.')[0:3] + ['255']
# self.UDP_IP = '.'.join(UDP_IP_list)
# print(self.UDP_IP)
#
# self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def udpSend(self, msg):
XCSOAR_UDP_PORT = 4353
self.sock.sendto(msg, (self.UDP_IP, XCSOAR_UDP_PORT))
def end(self):
self.wlan.active(False)
###########################################################
#
# w = station("Galaxy A71B468", "jcye9075")
# x = 10
# while(x):
# print(x)
# time.sleep(1.0)
# x-=1
# w.end()
# print("end")
audio.py
import machine
from machine import Pin
from rp2 import PIO, StateMachine, asm_pio
#from machine import Timer
import time
import utime
# to fix ENOMEM error
#rp2.PIO(0).remove_program()
#############################################################################
# PIO code from https://hackspace.raspberrypi.com/articles/raspberry-pi-picos-pio-for-mere-mortals-part-3-sound
#############################################################################
## importing this will load and prep the PIO code.
## The audio class below will activate it as required for the tone to sound
#############################################################################
@asm_pio(sideset_init=PIO.OUT_LOW)
def square_prog():
label("restart")
pull(noblock) .side(0)
mov(x, osr)
mov(y, isr)
#start loop
#here, the pin is low, and it will count down y
#until y=x, then put the pin high and jump to the next secion
label("uploop")
jmp(x_not_y, "skip_up")
nop() .side(1)
jmp("down")
label("skip_up")
jmp(y_dec, "uploop")
#mirror the above loop, but with the pin high to form the second
#half of the square wave
label("down")
mov(y, isr)
label("down_loop")
jmp(x_not_y, "skip_down")
nop() .side(0)
jmp("restart")
label("skip_down")
jmp(y_dec, "down_loop")
max_count = 5000
freq = 1000000
#square_sm = StateMachine(0, square_prog, freq=freq, sideset_base=Pin(0))
square_sm = StateMachine(0, square_prog, freq=freq, sideset_base=Pin(28))
#pre-load the isr with the value of max_count
square_sm.put(max_count)
square_sm.exec("pull()")
square_sm.exec("mov(isr, osr)")
#############################################################################
class audio:
def __init__(self):
''' usinf a POI state machine for the signal generator, calculate the freq, on time and off time of the audio vario '''
print("Starting Audio")
self.freq = 4000
self.freqLast = 4000
self.pause = 10
self.beep = 200
self.beepOntime = time.ticks_ms() + self.pause
self.beepOfftime = time.ticks_ms() + self.beep
self.vario = 0.0
self.tune = True
self.beat = 0
square_sm.put(4000)
## setup and start the time that will call the do function
# self.timer = Timer(-1)
# self.timer.init(period=100, mode=Timer.PERIODIC, callback=self.do)
## once setup simply update the vario variable. Usage show below
def setV(self, v):
self.tune = True
# print(v)
# print(v, self.freq, self.beep, self.pause)
def setTune(self):
self.tune = True
self.beat = 0
# def do(self, tm):
def do(self, v):
# self.tune = True
self.vario = v
ticks = time.ticks_ms()
baseF = 4000
liftPosThres = 0.2
sinkNegThres = -1.1
if(self.vario >= liftPosThres):
self.freq = int(baseF + int(self.vario * 100.0))
self.pause= 75
self.beep = 1000 - int(self.vario * 100)
elif(self.vario > sinkNegThres):
self.beep = 0
self.freq = 500
elif(self.vario < sinkNegThres):
self.freq = int(baseF/2 + int(self.vario * 100.0)) # v is negative
self.pause = 50
self.beep = 3000
else:
print("error v out of range")
if(self.beep <= 0): self.beep = 0
if(self.freq != self.freqLast): # not pulling from the buffer
square_sm.put(self.freq) # when we put there
if(square_sm.tx_fifo() > 2): # requires a
square_sm.exec("pull()") # pull to stop it blocking up
self.freqLast = self.freq
if(self.beep > 0): # should beep be working
if(square_sm.active()): # is it on now
if(utime.ticks_diff(ticks, self.beepOfftime) >= 0):
square_sm.active(0) # turn off and calculate next on time
self.beepOntime = utime.ticks_add(ticks, self.pause)
else:
if(utime.ticks_diff(ticks, self.beepOntime) >= 0):
square_sm.active(1) # turn onand calculate next off time
self.beepOfftime = utime.ticks_add(ticks, self.beep)
else:
square_sm.active(0)
def doTune(self):
if(self.beat < 5):
square_sm.put(4000)
square_sm.active(1)
self.beat += 1
elif(self.beat < 10):
square_sm.put(3000)
self.beat += 1
elif(self.beat < 20):
square_sm.put(4900)
self.beat += 1
elif(self.beat < 40):
square_sm.put(4000)
self.beat += 1
elif(self.beat < 50):
square_sm.active(0)
self.beat = 0
self.tune = False
def doStartBeep(self):
square_sm.put(3000)
square_sm.active(1)
time.sleep(0.2)
square_sm.active(0)
time.sleep(0.1)
def doStartTune(self):
square_sm.put(3500)
square_sm.active(1)
time.sleep(0.05)
square_sm.active(0)
time.sleep(0.1)
square_sm.active(1)
time.sleep(0.1)
square_sm.active(0)
time.sleep(0.1)
square_sm.put(4500)
square_sm.active(1)
time.sleep(0.2)
square_sm.active(0)
def doEndTune(self):
square_sm.put(4500)
square_sm.active(1)
time.sleep(0.05)
square_sm.active(0)
time.sleep(0.1)
square_sm.active(1)
time.sleep(0.1)
square_sm.active(0)
time.sleep(0.1)
square_sm.put(3500)
square_sm.active(1)
time.sleep(0.2)
square_sm.active(0)
def end(self):
# self.timer.deinit()
square_sm.active(0)
#############################################################################
## usage example
#import random
# aud = audio()
# var = 0.0
#
# x=5
# while(x):
# aud.doStartTune()
# time.sleep(1.0)
#
#
# # var += 0.00 + float(random.randrange(-10,10)/100.0)
# # aud.setV(0.2)
# # time.sleep(0.1)
# # aud.setV(0.0)
# # time.sleep(0.1)
# # aud.setV(-2.0)
# # time.sleep(0.1)
# x-=1
# print(x)
# # to stop the noise!!! run this only
# square_sm.active(0)
# aud.end()
# rp2.PIO(0).remove_program()
sensors.py
#import machine
from machine import Pin, I2C
import _thread
import utime
import time
import math
## Starting a new thread on the PICO, should put it in Core1
## use a loop to get/process the ASI pressure sensor and the Altimeter/Vario sensor
global ASIfinal
ASIfinal = 0.0
global ALTIfinal
ALTIfinal = 0.0
global VARIOfinal
VARIOfinal = 0.0
global TIMEsample
TIMEsample = utime.ticks_ms()
global PRESSfinal
PRESSfinal = 0.0
global TEMPfinal
TEMPfinal = 0.0
global core_one_run
core_one_run = True
def readASI():
global ASIfinal
return ASIfinal
def readALTI():
global ALTIfinal
return ALTIfinal
def readVARIO():
global VARIOfinal
return VARIOfinal
def endCore1():
global core_one_run
print("c1")
core_one_run = False
print("c2")
time.sleep(0.5)
print("c3")
_thread.exit()
print("c4")
def sensors():
global ASIfinal
global ALTIfinal
global VARIOfinal
global TIMEsample
global core_one_run
# hardware setup hardcoded for now
i2cAsix = 0
i2cAsiAddr = 0x21
i2cAVx = 0
i2cAVAddr = 0x77
# setup ASI pressure readings
IAS = 0.0
i2cAsi = I2C(i2cAsix, scl=Pin(21), sda=Pin(20), freq=100000)
diffP = 0.0
temp = 0.0
i2cAsi.writeto(i2cAsiAddr, bytearray([0x36, 0x03]), 2)
# setup Barometric pressure sensor for Alti and Vario
i2cAV = I2C(i2cAVx, scl=Pin(21), sda=Pin(20), freq=100000)
i2cAV.writeto(i2cAVAddr, bytearray([0x1E])) # reset_cmd()
utime.sleep_ms(200)
data = i2cAV.readfrom_mem(i2cAVAddr, 0xA2, 2)
C1 = data[0] * 256 + data[1]
data = i2cAV.readfrom_mem(i2cAVAddr, 0xA4, 2)
C2 = data[0] * 256 + data[1]
data = i2cAV.readfrom_mem(i2cAVAddr, 0xA6, 2)
C3 = data[0] * 256 + data[1]
data = i2cAV.readfrom_mem(i2cAVAddr, 0xA8, 2)
C4 = data[0] * 256 + data[1]
data = i2cAV.readfrom_mem(i2cAVAddr, 0xAA, 2)
C5 = data[0] * 256 + data[1]
data = i2cAV.readfrom_mem(i2cAVAddr, 0xAC, 2)
C6 = data[0] * 256 + data[1]
## setupthe kalman filter
ALTIfinal = 0.0 # x_abs
VARIOfinal = 0.0 # x_vel
t = utime.ticks_ms()
dt = 0.03 # sample interval
cz = 7000.0 # p_abs_abs
cv = 0.0 # p_abs_vel
cdv = 0.0 # p_vel_vel
vz = 0.1 # var_z_abs
# setup done now loop doing the read/conversions/processing
while(core_one_run):
# do ASI sensor read and process
def checkCRC(data, size, csum): # this used only for ASI chip
crc = 0xff
for i in range(0, size, 1):
crc ^= data[i]
crc &= 0xff
for bit in range(8, 0, -1):
if(crc & 0x80):
crc = (crc << 1) ^ 0x31
crc &= 0xff
else:
crc = crc << 1
crc &= 0xff
return (crc == csum)
######################
data = i2cAsi.readfrom(i2cAsiAddr, 3)
p = data[0:2]
pcrc = data[2]
IAS = 0.0
press = 0
if checkCRC(data[0:2], 2, data[2]):
press = p[0] *256 + p[1]
if press & 0x8000: ## handle the signed int
press -= 0x10000
press /= 60.0
else:
print("err")
if(press > 0.0):
IAS = math.sqrt(press * 1.568135)
if IAS < 1.0:
ISA = 0.0
ASIfinal = IAS
# print(ASIfinal)
# do Bari sensor reading and process
i2cAV.writeto(i2cAVAddr, bytearray([0x48])) #pres_conversion()
utime.sleep_ms(10)
value = i2cAV.readfrom_mem(i2cAVAddr, 0x00, 3) # read_pressure()
D1 = value[0] * 65536 + value[1] * 256 + value[2]
i2cAV.writeto(i2cAVAddr, bytearray([0x58])) # temp_conversion()
utime.sleep_ms(10)
value = i2cAV.readfrom_mem(i2cAVAddr, 0x00, 3) # read_temp()
D2 = value[0] * 65536 + value[1] * 256 + value[2]
"""Conversion of the read data to get the final output"""
dT = D2 - (C5 * 256)
TEMP = 2000 + ((dT * C6) / 8388608)
OFF = C2 * 65536 + (C4 * dT) / 128
SENS = C1 * 32768 + (C3 * dT ) / 256
T2 = 0
OFF2 = 0
SENS2 = 0
if TEMP >= 2000 :
T2 = 0
OFF2 = 0
SENS2 = 0
elif TEMP < 2000 :
T2 = (dT * dT) / 2147483648
OFF2 = 5 * ((TEMP - 2000) * (TEMP - 2000)) / 2
SENS2 = 5 * ((TEMP - 2000) * (TEMP - 2000)) / 4
if TEMP < -1500 :
OFF2 = OFF2 + 7 * ((TEMP + 1500) * (TEMP + 1500))
SENS2 = SENS2 + 11 * ((TEMP + 1500) * (TEMP + 1500)) / 2
TEMP = TEMP - T2
OFF = OFF - OFF2
SENS = SENS - SENS2
PRESSfinal = ((((D1 * SENS) / 2097152) - OFF) / 32768.0) / 100.0
TEMPfinal = TEMP / 100.0
# print(PRESSfinal, TEMPfinal)
QNH = 1013.25
alti = 44330.0 * (1 - (PRESSfinal / QNH)** 0.190295)
# print(alti)
# avkalman(alti) now inline
tn = utime.ticks_ms() # Manage the time between samples dt
dt = float(utime.ticks_diff(tn, TIMEsample))/1000.0
TIMEsample = tn
# print("dt ", dt)
var_accel = 1.0 # a constant, never get changed?
ALTIfinal += VARIOfinal * dt ## expected Alti
# update state covariance, calculated from last samples variables
cz += (2.0 * dt * cv) + (dt**2 * cdv) + (var_accel * dt**4) /4.0 # static local
cv += dt * cdv + (var_accel * dt**3)/2.0 # static local
cdv += var_accel * dt**2 # static local
# update
y = alti - ALTIfinal ## error, difference between actual and expected Alti
s_inv = 1.0 / (cz + vz) # local
k_abs = cz * s_inv # Kalman gain od alti # local
k_vel = cv * s_inv # Kalman gain od vario # local
# update state estimate ## no need to check lock as this is the only locking
ALTIfinal += k_abs * y ## Alti and
VARIOfinal += k_vel * y ## vario
# update state covariance
cdv -= cv * k_vel ## as above these are static value to be used to test adjust next sample
cv -= cv * k_abs
cz -= cz * k_abs
# print(ALTIfinal, VARIOfinal)
time.sleep(0.1)
print("Endcore1")
def startSensors():
print("Starting Sensors _thread")
try:
_thread.start_new_thread(sensors, ())
except Exception as e:
print("err ", e)
## usage example
#startSensors()
# x=50
# while(x):
# time.sleep(0.05)
# print(readASI(), readALTI(), readVARIO())
# x-=1
# endCore1()
# print("end")
gpsuart.py
from machine import Pin
from machine import UART
#from machine import Timer
import time
import utime
######################################################
class gpsUart:
def __init__(self):
print("Starting gpsUart")
# self.prefixes = ['$GNGGA','$GPGSA','$GLGSA','$GNRMC','$GNVTG']
self.msgs = []
self.rxData = bytes()
#### Start Uart for the GPS ####
self.uart = UART(0, baudrate=9600, tx=Pin(16), rx=Pin(17)) ## changed from original for veroboard unit (PL)
# self.timer = Timer(-1)
# self.timer.init(period=50, mode=Timer.PERIODIC, callback=self.readUart)
def messages(self):
return len(self.msgs)
def getNMEA(self):
if(len(self.msgs) > 0):
return self.msgs.pop(0)
def readUart(self):
# rxData = bytes()
while self.uart.any() > 0:
b = self.uart.read(1)
self.rxData += b
if (b == '\n'.encode('UTF8')):
self.msgs.append(self.rxData)
self.rxData = bytes()
while(len(self.msgs) > 10):
self.msgs.pop(0)
print("gpsUart msg buffer > 10, popping")
# utime.sleep_ms(1)
def dummy(self, x): ## call with x = 0 to 4
def gpstimeStr():
return (str(time.gmtime()[3]) + str(time.gmtime()[4]) + str(time.gmtime()[5]))
if x == 0:
# generate GNGGA sentence
ggaSuffix = ",3731.575481,S,14525.516744,E,1,4,2.07,374.105,M,-2.272,M,,"
gga = "GNGGA," + gpstimeStr() + ggaSuffix
crcgga = self.crcCalc(gga)
ggaStr = "$" + gga + "*" + '{0:x}'.format(crcgga)+ "\r\n"
return ggaStr
# copy GNPSA and GLSA sentences
elif x == 1:
pgsa = "$GPGSA,A,3,25,05,15,29,,,,,,,,,2.29,2.07,0.98*02" + "\r\n"
return pgsa
elif x == 2:
lgsa = "$GLGSA,A,3,,,,,,,,,,,,,2.29,2.07,0.98*13" + "\r\n"
return lgsa
elif x == 3:
# generate GNRMC sentence
rmcSuffix = ",A,3731.575481,S,14525.516744,E,1.21,209.33,300822,,,A"
rmc = "GNRMC," + gpstimeStr() + rmcSuffix
crcrmc = self.crcCalc(rmc)
rmcStr = "$" +rmc + "*" + '{0:x}'.format(crcrmc)+ "\r\n"
return rmcStr
elif x == 4:
vtg = "$GNVTG,209.33,T,,M,1.21,N,2.23,K,A*29" + "\r\n"
return vtg
else:
print("Error in dummy gps number")
return None
def crcCalc(self, sentence): # sentence in bytearray form
byCRC = 0
if type(sentence) != bytes:
sentence = bytearray(sentence.encode('UTF8'))
for each in sentence:
byCRC ^= each
return byCRC
def end(self):
# self.timer.deinit()
time.sleep(0.1)
## Usage example
# gps = gpsUart()
# x = 50
# while(x):
# gps.readUart()
# res = gps.getNMEA()
# if res != None:
# try:
# print(res.decode('UTF8'))
# except:
# pass
# x-=1
# time.sleep(0.1)
#
# gps.end()
# x = 0
# for each in gps.msgs:
# print(x)
# print(each.decode('UTF8'))
# x+=1
lxnav.py
######################################################
class LXNAV:
def __init__(self):
''' create the LXNAV LXZP0 sentence for XCSoar '''
# eg usage string to bytearray sbytes = bytes(string, 'UTF8')
# crc = crcCalc(sbytes)
def crcCalc(self, sentence): # sentence in bytearray form
byCRC = 0
if type(sentence) != bytes:
sentence = bytearray(sentence.encode('UTF8'))
for each in sentence:
byCRC ^= each
return byCRC
def get(self, asi, alti, vario):
# use a list of data to fill in the sentence, end with CRC
# output format $LXWP0,N,'tas km/h','altutude m', v0 m/s, v1 m/s, v2 m/s, v3 m/s, v4 m/s, v5 m/s, heading, wDir, wSpeed]
# eg $LXWP0,Y,119.4,1717.6,0.02,0.02,0.02,0.02,0.02,0.02,,000,107.2*5b
opstr = "LXWP0,N," + asi + ',' + alti + ',' + vario + ',,,,,,,,' ## initial opstr has to $ nor crc
sb = bytes(opstr, 'UTF8') ## make it a bytearray for the CRC
crc = self.crcCalc(sb)
## assemble and return
return bytearray(('$' + opstr + '*' + '{0:x}'.format(crc)+ "\r\n").encode('UTF8'))
Photos.
This is the latest desktop development arrangement. The code above was tested on this a few minutes ago.
This is how it started;
This is the Epaper version, lots to do yetā¦
Behind the display.
Having an ASI pitot on my harness or wrist is not going to be useful. So as the Kobo can still be wired up wth a serial port or USB-Serial adapter, I plan to mount the Pitot and interface on the back of the Kobo, facing the airflow. This is using a Seed Studios XIAO-RP2040. But lots of other options. I did try using a second Pico-w and connect to the first WAP to send the ASI data onto the Kobo via the same LXNAV sentence but I could not get 2 Clients to connect to the Pioc-W??
Have fun!
PeteL.