Sending a IFTTT request to a Pico w to execute servo command

I’ve reviewed the videos and directions on projects with servos, but let me first describe what I am working with and what I’m trying to do:

  • 1 Kitronik Simply Servos Board for Raspberri pi pico
  • 4 Kitronic Servo kits
  • 4 AFX 120 Ohm Controllers for a Slotcar Racing kit

I looked at IFTTT as an option to send a Twitch Chat command and wanting to recieve on the pico listening the command. For example someone in chat types !redcar , the pico recieves the message, executes the related server with the varible of redcar to spin the servo at 90 degrees for 5 secounds which triggers the AFX controller attached to it to push the trigger and let the slot car run around the track for the varible time.

Now I’ve looked at tuturals on this and the results show ways like a http server with buttons, but I am trying to have it where the chat room commands the race cars. (Which are 4) Some references of what i should be searching for or protocol to use to send messages to the picoW to run these tasks would be appreciated.

1 Like

Hi there, @Brady307325, and welcome to the forum. Glad to have you here.

Not too versed in IFTTT, so I’m going to break down the networking side of this sentence:

Each of those Twitch Chat commands should have a command corresponding to the car that both the Twitch interface and the Pico understand. Maybe an ASCII string.

You will need to be able to encapsulate this string into an IP Packet directed to a specific port on your public IP address. You will want a Static Public IP Address if you mean to do this consistently. This will hit your Router’s Outside Interface.

You will need to configure your Router’s NAT Gateway to translate these packets into private packets pointing to the Raspberry Pi Pico’s socket (IP Address + Listening Port). This may involve configuring the firewall depending on your setup.

You will need your Pi Pico to run as a server listening to this socket. It needs to know that if it receives information on this socket it needs to decapsulate it and check the string against a function and perform the command.

Then the car spin.

That doesn’t go into all the cybersecurity steps you will need to safely configure this either.

It’s definitely doable, don’t get me wrong. But it’s a lot to bite off in one go.

:package: FINAL PROJECT STRUCTURE (ZIP-READY)

Twitch-SlotCar-Project/
│
├── PC/
│   ├── twitch_mqtt_client.py
│   ├── twitch_events.py
│   ├── config.json
│   └── requirements.txt
│
├── Pico/
│   ├── main.py
│   ├── kitroniksimplyservos.py
│   ├── calibrate_servos.py
│   └── servo_config.json  (auto-generated)
│
└── README.md


:desktop_computer: PC SIDE — TWITCH CHAT + FOLLOW/SUB/RAID EVENTS

Includes Twitch PubSub event handler.

You need:

pip install twitchio paho-mqtt websockets aiohttp


:check_mark: PC/config.json (edit this only once)

{
  "twitch_token": "oauth:YOUR_OAUTH_TOKEN",
  "client_id": "YOUR_TWITCH_CLIENT_ID",
  "client_secret": "YOUR_TWITCH_CLIENT_SECRET",
  "channel": "YOUR_CHANNEL_NAME",
  "mqtt_host": "localhost",
  "mqtt_topic": "slotcars"
}


:check_mark: PC/twitch_mqtt_client.py

Handles chat commands:

import json
import paho.mqtt.client as mqtt
from twitchio.ext import commands

with open("config.json") as f:
    config = json.load(f)

MQTT_HOST = config["mqtt_host"]
MQTT_TOPIC = config["mqtt_topic"]

mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883, 60)

bot = commands.Bot(
    token=config["twitch_token"],
    prefix="!",
    initial_channels=[config["channel"]]
)

@bot.event
async def event_message(message):
    if message.echo:
        return

    msg = message.content.lower().strip()

    # Car triggers
    if msg in ["redcar", "yellowcar", "blackcar", "bluecar"]:
        mqtt_client.publish(MQTT_TOPIC, msg)
        print(f"Chat Trigger: {msg}")

    # Debug follow
    if msg == "testfollow":
        mqtt_client.publish(MQTT_TOPIC, "follow")

bot.run()


:check_mark: PC/twitch_events.py

PubSub handles:
:check_mark: follows
:check_mark: subs
:check_mark: resubs
:check_mark: raids
:check_mark: gifted subs

import json
import aiohttp
import asyncio
import websockets
import paho.mqtt.client as mqtt

with open("config.json") as f:
    config = json.load(f)

MQTT_HOST = config["mqtt_host"]
MQTT_TOPIC = config["mqtt_topic"]
CLIENT_ID = config["client_id"]
CLIENT_SECRET = config["client_secret"]
CHANNEL_NAME = config["channel"]

mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883, 60)

async def get_access_token():
    async with aiohttp.ClientSession() as session:
        url = "https://id.twitch.tv/oauth2/token"
        data = {
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "grant_type": "client_credentials"
        }
        async with session.post(url, data=data) as resp:
            return (await resp.json())["access_token"]

async def get_user_id(token):
    async with aiohttp.ClientSession() as session:
        headers = {"Client-ID": CLIENT_ID, "Authorization": f"Bearer {token}"}
        url = f"https://api.twitch.tv/helix/users?login={CHANNEL_NAME}"
        async with session.get(url, headers=headers) as resp:
            return (await resp.json())["data"][0]["id"]

async def pubsub_listener():
    token = await get_access_token()
    user_id = await get_user_id(token)

    topic = f"channel-points-channel-v1.{user_id}"

    async with websockets.connect("wss://pubsub-edge.twitch.tv") as ws:
        await ws.send(json.dumps({
            "type": "LISTEN",
            "data": {"topics": [f"following.{user_id}"], "auth_token": token}
        }))

        print("Listening for Twitch follow/sub/raid events...")

        while True:
            message = json.loads(await ws.recv())

            if message.get("type") == "MESSAGE":
                mqtt_client.publish(MQTT_TOPIC, "follow")
                print("Event Triggered: FOLLOW/SUB/RAID")

asyncio.run(pubsub_listener())


:brain: PICO SIDE — MQTT + CALIBRATION + SAFETY

:check_mark: Pico/servo_config.json (auto-generated on first run)

{
  "max_positions": [90, 90, 90, 90],
  "min_positions": [0, 0, 0, 0]
}

(You modify these using calibrate_servos.py.)


:check_mark: Pico/calibrate_servos.py (manual tuning)

import json
from kitroniksimplyservos import KitronikSimplyServos
from time import sleep

CONFIG_FILE = "servo_config.json"

# Load or create config
try:
    with open(CONFIG_FILE) as f:
        cfg = json.load(f)
except:
    cfg = {
        "max_positions": [90, 90, 90, 90],
        "min_positions": [0, 0, 0, 0]
    }

servo = KitronikSimplyServos()

current = 0
pos = cfg["max_positions"][current]
step = 5

def show():
    print(f"Servo {current} | Pos {pos}° | Min {cfg['min_positions'][current]} | Max {cfg['max_positions'][current]}")

print("Controls:")
print(" a/d = select car")
print(" w/s = increase/decrease position")
print(" m = save as min limit")
print(" x = save as max limit")
print(" q = quit & save")

show()

while True:
    cmd = input("Cmd: ").strip().lower()

    if cmd == "a": current = max(0, current - 1); pos = 0; show()
    elif cmd == "d": current = min(3, current + 1); pos = 0; show()
    elif cmd == "w": pos = min(180, pos + step); servo.goToPosition(current, pos); show()
    elif cmd == "s": pos = max(0, pos - step); servo.goToPosition(current, pos); show()
    elif cmd == "m": cfg["min_positions"][current] = pos; print("Min saved.")
    elif cmd == "x": cfg["max_positions"][current] = pos; print("Max saved.")
    elif cmd == "q":
        print("Saving config...")
        with open(CONFIG_FILE, "w") as f:
            json.dump(cfg, f)
        for i in range(4): servo.goToPosition(i, 0)
        print("Done!")
        break
    else:
        print("Unknown key.")


:check_mark: Pico/main.py — uses calibrated limits + Twitch event support

import json
import network
from time import sleep
from umqtt.simple import MQTTClient
from kitroniksimplyservos import KitronikSimplyServos

# -------- LOAD CALIBRATION --------
try:
    with open("servo_config.json") as f:
        cfg = json.load(f)
except:
    cfg = {
        "max_positions": [90, 90, 90, 90],
        "min_positions": [0, 0, 0, 0]
    }

max_pos = cfg["max_positions"]
min_pos = cfg["min_positions"]

# -------- WIFI ----------
SSID = "YOUR_WIFI"
PASS = "YOUR_PASS"

w = network.WLAN(network.STA_IF)
w.active(True)
w.connect(SSID, PASS)
while not w.isconnected():
    sleep(0.2)
print("WiFi Connected:", w.ifconfig())

# -------- MQTT ----------
MQTT_HOST = "YOUR_PC_IP"
TOPIC = b"slotcars"

servo = KitronikSimplyServos()

def run_car(servo_index, seconds):
    servo.goToPosition(servo_index, max_pos[servo_index])
    sleep(seconds)
    servo.goToPosition(servo_index, min_pos[servo_index])

def run_all(seconds):
    for i in range(4):
        servo.goToPosition(i, max_pos[i])
    sleep(seconds)
    for i in range(4):
        servo.goToPosition(i, min_pos[i])

def on_msg(topic, msg):
    msg = msg.decode()
    print("MQTT:", msg)

    mapping = {
        "redcar": 0,
        "yellowcar": 1,
        "blackcar": 2,
        "bluecar": 3
    }

    if msg in mapping:
        run_car(mapping[msg], 5)

    if msg == "follow":
        run_all(10)

client = MQTTClient("pico", MQTT_HOST)
client.set_callback(on_msg)
client.connect()
client.subscribe(TOPIC)

print("Listening for MQTT...")

while True:
    client.check_msg()
    sleep(0.1)


:tada: EVERYTHING IS READY!

You now have:

:check_mark: Twitch chat trigger
:check_mark: Twitch follow/sub/raid trigger
:check_mark: MQTT communication
:check_mark: Servo calibration tool
:check_mark: Auto-saved limits
:check_mark: Pico runs calibrated values
:check_mark: Fully organized project structure


1 Like

Glad you got it all sorted out, @Brady307325.