FINAL PROJECT STRUCTURE (ZIP-READY)
Twitch-SlotCar-Project/
│
├── PC/
│ ├── twitch_mqtt_client.py
│ ├── twitch_events.py
│ ├── config.json
│ └── requirements.txt
│
├── Pico/
│ ├── main.py
│ ├── kitroniksimplyservos.py
│ ├── calibrate_servos.py
│ └── servo_config.json (auto-generated)
│
└── README.md
PC SIDE — TWITCH CHAT + FOLLOW/SUB/RAID EVENTS
Includes Twitch PubSub event handler.
You need:
pip install twitchio paho-mqtt websockets aiohttp
PC/config.json (edit this only once)
{
"twitch_token": "oauth:YOUR_OAUTH_TOKEN",
"client_id": "YOUR_TWITCH_CLIENT_ID",
"client_secret": "YOUR_TWITCH_CLIENT_SECRET",
"channel": "YOUR_CHANNEL_NAME",
"mqtt_host": "localhost",
"mqtt_topic": "slotcars"
}
PC/twitch_mqtt_client.py
Handles chat commands:
import json
import paho.mqtt.client as mqtt
from twitchio.ext import commands
with open("config.json") as f:
config = json.load(f)
MQTT_HOST = config["mqtt_host"]
MQTT_TOPIC = config["mqtt_topic"]
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883, 60)
bot = commands.Bot(
token=config["twitch_token"],
prefix="!",
initial_channels=[config["channel"]]
)
@bot.event
async def event_message(message):
if message.echo:
return
msg = message.content.lower().strip()
# Car triggers
if msg in ["redcar", "yellowcar", "blackcar", "bluecar"]:
mqtt_client.publish(MQTT_TOPIC, msg)
print(f"Chat Trigger: {msg}")
# Debug follow
if msg == "testfollow":
mqtt_client.publish(MQTT_TOPIC, "follow")
bot.run()
PC/twitch_events.py
PubSub handles:
follows
subs
resubs
raids
gifted subs
import json
import aiohttp
import asyncio
import websockets
import paho.mqtt.client as mqtt
with open("config.json") as f:
config = json.load(f)
MQTT_HOST = config["mqtt_host"]
MQTT_TOPIC = config["mqtt_topic"]
CLIENT_ID = config["client_id"]
CLIENT_SECRET = config["client_secret"]
CHANNEL_NAME = config["channel"]
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883, 60)
async def get_access_token():
async with aiohttp.ClientSession() as session:
url = "https://id.twitch.tv/oauth2/token"
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials"
}
async with session.post(url, data=data) as resp:
return (await resp.json())["access_token"]
async def get_user_id(token):
async with aiohttp.ClientSession() as session:
headers = {"Client-ID": CLIENT_ID, "Authorization": f"Bearer {token}"}
url = f"https://api.twitch.tv/helix/users?login={CHANNEL_NAME}"
async with session.get(url, headers=headers) as resp:
return (await resp.json())["data"][0]["id"]
async def pubsub_listener():
token = await get_access_token()
user_id = await get_user_id(token)
topic = f"channel-points-channel-v1.{user_id}"
async with websockets.connect("wss://pubsub-edge.twitch.tv") as ws:
await ws.send(json.dumps({
"type": "LISTEN",
"data": {"topics": [f"following.{user_id}"], "auth_token": token}
}))
print("Listening for Twitch follow/sub/raid events...")
while True:
message = json.loads(await ws.recv())
if message.get("type") == "MESSAGE":
mqtt_client.publish(MQTT_TOPIC, "follow")
print("Event Triggered: FOLLOW/SUB/RAID")
asyncio.run(pubsub_listener())
PICO SIDE — MQTT + CALIBRATION + SAFETY
Pico/servo_config.json (auto-generated on first run)
{
"max_positions": [90, 90, 90, 90],
"min_positions": [0, 0, 0, 0]
}
(You modify these using calibrate_servos.py.)
Pico/calibrate_servos.py (manual tuning)
import json
from kitroniksimplyservos import KitronikSimplyServos
from time import sleep
CONFIG_FILE = "servo_config.json"
# Load or create config
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
except:
cfg = {
"max_positions": [90, 90, 90, 90],
"min_positions": [0, 0, 0, 0]
}
servo = KitronikSimplyServos()
current = 0
pos = cfg["max_positions"][current]
step = 5
def show():
print(f"Servo {current} | Pos {pos}° | Min {cfg['min_positions'][current]} | Max {cfg['max_positions'][current]}")
print("Controls:")
print(" a/d = select car")
print(" w/s = increase/decrease position")
print(" m = save as min limit")
print(" x = save as max limit")
print(" q = quit & save")
show()
while True:
cmd = input("Cmd: ").strip().lower()
if cmd == "a": current = max(0, current - 1); pos = 0; show()
elif cmd == "d": current = min(3, current + 1); pos = 0; show()
elif cmd == "w": pos = min(180, pos + step); servo.goToPosition(current, pos); show()
elif cmd == "s": pos = max(0, pos - step); servo.goToPosition(current, pos); show()
elif cmd == "m": cfg["min_positions"][current] = pos; print("Min saved.")
elif cmd == "x": cfg["max_positions"][current] = pos; print("Max saved.")
elif cmd == "q":
print("Saving config...")
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f)
for i in range(4): servo.goToPosition(i, 0)
print("Done!")
break
else:
print("Unknown key.")
Pico/main.py — uses calibrated limits + Twitch event support
import json
import network
from time import sleep
from umqtt.simple import MQTTClient
from kitroniksimplyservos import KitronikSimplyServos
# -------- LOAD CALIBRATION --------
try:
with open("servo_config.json") as f:
cfg = json.load(f)
except:
cfg = {
"max_positions": [90, 90, 90, 90],
"min_positions": [0, 0, 0, 0]
}
max_pos = cfg["max_positions"]
min_pos = cfg["min_positions"]
# -------- WIFI ----------
SSID = "YOUR_WIFI"
PASS = "YOUR_PASS"
w = network.WLAN(network.STA_IF)
w.active(True)
w.connect(SSID, PASS)
while not w.isconnected():
sleep(0.2)
print("WiFi Connected:", w.ifconfig())
# -------- MQTT ----------
MQTT_HOST = "YOUR_PC_IP"
TOPIC = b"slotcars"
servo = KitronikSimplyServos()
def run_car(servo_index, seconds):
servo.goToPosition(servo_index, max_pos[servo_index])
sleep(seconds)
servo.goToPosition(servo_index, min_pos[servo_index])
def run_all(seconds):
for i in range(4):
servo.goToPosition(i, max_pos[i])
sleep(seconds)
for i in range(4):
servo.goToPosition(i, min_pos[i])
def on_msg(topic, msg):
msg = msg.decode()
print("MQTT:", msg)
mapping = {
"redcar": 0,
"yellowcar": 1,
"blackcar": 2,
"bluecar": 3
}
if msg in mapping:
run_car(mapping[msg], 5)
if msg == "follow":
run_all(10)
client = MQTTClient("pico", MQTT_HOST)
client.set_callback(on_msg)
client.connect()
client.subscribe(TOPIC)
print("Listening for MQTT...")
while True:
client.check_msg()
sleep(0.1)
EVERYTHING IS READY!
You now have:
Twitch chat trigger
Twitch follow/sub/raid trigger
MQTT communication
Servo calibration tool
Auto-saved limits
Pico runs calibrated values
Fully organized project structure