Pi Pico W to Pico W - Simple Communications

Following success with my latest project.
Main aim being to develop a simple way of getting two Pico W’s to communicate, thought I would share the Python code I developed.

One Pico W sets itself up as a WiFi Access Point and listens for a connection from the other Pico W.
The other Pico W connects to the Access Point, collects data and sends it.
The Access Point responds with ‘success’ if data is received ok.

The code here is the basic for the communications, whatever is done with the data or how it is collected is not shown. Don’t expect the files below to run properly as they are; they need additional code to read a sensor or something and to process the received sensor data. My project used a temperature sensor and a LCD display to show the data.

Cheers
Jim

Client File

############################################################################
#    Pi Pico W Client
#
# Connects to Access Point to send data
############################################################################
import os
import network
import socket
import time
import urequests
from SSID_AP import SSID_AP
############################################################################
'''
SSID_AP.py is a files in the same folder as the running program
It contains SSID and Password information

SSID_AP File: SSID_AP.py
SSID_AP = {
    'ssid': 'any name you want to call the access point',
    'pw': 'whatever password you decide to use'
    }

'''
############################################################################

host =    '192.168.10.2'
server =  '192.168.10.1'
gateway = '192.168.10.1'
port = 5263

ssid = SSID_AP['ssid']
pw = SSID_AP['pw']
wlan = network.WLAN(network.STA_IF)
wlan.active(True)

Onboard_LED = Pin("LED", Pin.OUT, value=0)

############################################################################
# Check for Server active
############################################################################
def check_SSID():
    print('Checking for Network ...',ssid)
    networks = wlan.scan()
    found = False
    for w in networks:
        if w[0].decode() == ssid:
            found = True
    return found

############################################################################
# Display Connect Error
############################################################################
def connect_Fail(s):
    Error = 'Connect Fail: '
    if s == 0:
        Error += 'Link Down'
    elif s == 1:
        Error += 'Link Join'
    elif s == 2:
        Error += 'Link NoIP'
    elif s == -1:
        Error += 'Link Fail'
    elif s == -2:
        Error += 'NoNet'
    elif s == -3:
        Error += 'BadAuth'
    else:
        Error += 'Unknown'
    print(Error)
    return

############################################################################
# Try to Connect to WiFi, returns WLAN status
############################################################################
def connect_to_Network():
    print('Connecting to Network ...')
    wlan.active(True)
    wlan.config(pm = 0xa11140)     # Disable power-save mode
    wlan.connect(ssid, pw)
    max_wait = 10                  # wait 10 seconds to connect
    while max_wait > 0:
        if wlan.status() < 0 or wlan.status() >= 3:
            break
        max_wait -= 1
        time.sleep(1)
    status = wlan.status()
    if status == 3:
        wlan.ifconfig((host, '255.255.255.0', gateway, '8.8.8.8'))
        status = wlan.ifconfig()
        print('ip = ' + status[0])
        return 1
    else:
        connect_Fail(status)
        return 0

############################################################################
# Main
############################################################################
Onboard_LED.on()
try:
    if check_SSID():
        if connect_to_Network() == 1:
            count = 5
            Exit = False
            s = socket.socket()
            print('Connecting to {}:{}'.format(server,port))
            s.connect((server, port))
            while not Exit:
'''
get some data to send, read sensor etc

'''
                data = ''some data
                s.send(data)
                msg = s.recv(512)
                if msg.find(b'Success') == -1:
                    count += 1
                    if count == 5:
                        Exit = True
                    else:
                        print('Success')
            s.close()
        else:
            print('Cannot connect ... ')
    else:
        print('Network NOT found ... ')
except Exception as e:
        print('Error occurred : {}'.format(e))

wlan.disconnect()
time.sleep(0.1)
wlan.active(False)
time.sleep(0.1)
print('Disconnect from Network.')
print('')
Onboard_LED.off()
time.sleep(120)

############################################################################

Access Point File

############################################################################
#        Pi Pico W Access Point
#
#  Connects to Local WiFi to get Date Time if available
#  Setup Pico as Access Point
#  Listen for client communications
#  Reply 'success' if data received ok
#
############################################################################
import network
import socket
import time
import ntptime
import uasyncio as asyncio
from machine import RTC
from SSID import SSID
from SSID_AP import SSID_AP
############################################################################
'''
SSID.py & SSID_AP.py are two files in the same folder as the running program
They contain SSID and Password information

SSID File: SSID.py
SSID = {
    'ssid': 'Local WiFi Name',
    'pw': 'Local Password'
    }

SSID_AP File: SSID_AP.py
SSID_AP = {
    'ssid': 'any name you want to call the access point',
    'pw': 'whatever password you decide to use'
    }

'''
############################################################################
# Local WiFi
server = '192.168.1.2'
gateway = '192.168.1.1'
port = 80
ssid = SSID['ssid']
pw = SSID['pw']

# HotSpot
serverAP = '192.168.10.1'
gatewayAP = '192.168.10.1'
portAP = 5263
ssidAP = SSID_AP['ssid']
pwAP = SSID_AP['pw']

############################################################################
# Setup
############################################################################
Onboard_LED = Pin("LED", Pin.OUT, value=0)
dt = RTC()

############################################################################
# Check for Server active
############################################################################
def check_SSID(wlan):
    print('Checking for Network ...',ssid)
    networks = wlan.scan()
    found = False
    for w in networks:
        if w[0].decode() == ssid:
            found = True
    return found

############################################################################
# Display Connect Error
############################################################################
def connect_Fail(s):
    Error = 'Connect Fail: '
    if s == 0:
        Error += 'Link Down'
    elif s == 1:
        Error += 'Link Join'
    elif s == 2:
        Error += 'Link NoIP'
    elif s == -1:
        Error += 'Link Fail'
    elif s == -2:
        Error += 'NoNet'
    elif s == -3:
        Error += 'BadAuth'
    else:
        Error += 'Unknown'
    print(Error)
    return

############################################################################
# Try to Connect to WiFi, returns WLAN status
############################################################################
def connect_to_Network(wlan):
    print('Connecting to Network ...')
    wlan.active(True)
    wlan.config(pm = 0xa11140)     # Disable power-save mode
    wlan.connect(ssid, pw)
    max_wait = 10                  # wait 10 seconds to connect
    while max_wait > 0:
        if wlan.status() < 0 or wlan.status() >= 3:
            break
        max_wait -= 1
        time.sleep(1)
    status = wlan.status()
    if status == 3:
        wlan.ifconfig((server, '255.255.255.0', gateway, '8.8.8.8'))
        status = wlan.ifconfig()
        print('ip = ' + status[0])
        return 1
    else:
        connect_Fail(status)
        return 0

############################################################################
# Get time from NTP Server and adjust for day light saving
#      Maybe there is a better way to do this.
############################################################################
def Get_NTPTime():
    try:
        print('Getting NTPTime ...')
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)
        time.sleep(2)
        if check_SSID(wlan):
            if connect_to_Network(wlan) == 1:
                ntptime.settime()
# adjust for day light saving, remove if not daily saving time
                s = time.time()
                if dt.datetime()[1] in [10,11,12,1,2,3]:
                    s += 39600
                else:
                    s += 36000
                x = time.localtime(s)
                dt.datetime((x[0],x[1],x[2],x[6],x[3],x[4],x[5],0))
                time.sleep(0.2)
                t = '{:02d}/{:02d}/{} {:02d}:{:02d}'.format(dt.datetime()[2],dt.datetime()[1],dt.datetime()[0],
                                                              dt.datetime()[4],dt.datetime()[5])
                print(t)
                time.sleep(2)
            else:
                print('Network NOT found ... ')
        else:
            print('SSID NOT found ... ')
        wlan.disconnect()
        time.sleep(0.1)
        wlan.active(False)
        time.sleep(4)
        lcd.clear()
    except Exception as e:
        print('Error occurred : {}'.format(e))

'''
    time.localtime() 
        0     1      2     3     4       5       6        7
     (year, month, day, hour, minute, second, weekday, yearday)
     
     dt.dt.datetime()
        0     1      2     3       4       5       6        7
     (year, month, day, weekday, hour, minute, second, subsecond)
     
    Note: different location of weekday
     
''' 
############################################################################
# Get Remote data, runs independant of Main Loop
############################################################################
async def SensorData(reader, writer):
    
    print("Client connected")
    Sensor = await reader.read(512)   # returns all charaters including b''
    Sensor = str(Sensor)              # convert bytes object to str object
    Sensor = Sensor[2:]               # strip leading b'
    Sensor = Sensor[:-1]              # strip trailing '
'''
    Do something with data received
'''
    writer.write(b'Success')          # send msg back 
    await writer.drain()
    await writer.wait_closed()
    print("Client disconnected")

#############################################################################
# Main Loop
#############################################################################
async def main():
# start server monitor task
    asyncio.create_task(asyncio.start_server(SensorData, "0.0.0.0", portAP))
    print('Server listening on {}:{}'.format(serverAP,portAP))

# Main loop
    while True:
        
'''
    Do stuff
'''
        await asyncio.sleep(5)           # need to wait some time to allow server task to function
        
############################################################################
# Startup 
############################################################################
Get_NTPTime()      # try to set the Pico time to correct local time
try:
    print('Setting up AP ...')
    time.sleep(2)
    wlanAP = network.WLAN(network.AP_IF)
    wlanAP.config(ssid=ssidAP, password=pwAP)
    wlanAP.active(True)
    print('ssid = ',ssidAP)
    time.sleep(1)
    wlanAP.ifconfig((serverAP, '255.255.255.0', gatewayAP, '8.8.8.8'))
    time.sleep(0.1)
    print('Config = ',wlanAP.ifconfig())
    time.sleep(2)

    asyncio.run(main())

except Exception as e:
    print('Error occurred : {}'.format(e))
finally:
    asyncio.new_event_loop()
    wlanAP.disconnect()
    time.sleep(0.1)
    wlanAP.active(False)
    time.sleep(0.1)
    Onboard_LED.off()

############################################################################
############################################################################
````Preformatted text`
6 Likes

Awesome project Jim!!

I was waiting for someone to write p2p comms for the Pico W and this is siiiiiiiiiick. Its a shame a lot is hidden behind the wireless MCU firmware.

Very keen to see if something like ESP-NOW is developed!

2 Likes

Hi Jim,

Awesome project!! I’m impressed with the depth of your error handling, looks like the user will always get helpful error info, which really improves the experience.

I’ll definitely keep this in mind if I need P2P communication between Picos. Do you have any words on how your solution compares to our new PiicoDev Transceiver module?

1 Like

I developed this before the PiicoDev TXRX was available, before I even knew it was being developed. I have run the two Pico’s for almost two months on the bench to test the life of the battery.
1100mAh LiPo lasts about 42 days. Wakes every 15 minutes, reads sensor, sends via WiFi, goes back to sleep. MakerVerse Nano Timer switches the power.

In my opinion the PiicoDev TXRX would be a much better solution. Removes the complexity of WiFi and for simple collection of sensor readings would ideal. The ability to use groups and channels enhances its features. Also Michaels video shows it has quite a good range. The Pi Pico W WiFi would allow internet access to the data if needed. I think the PiicoDev TXRX is a winner and the library makes it so easy to use.

Cheers
Jim

2 Likes