Python in IoT and Robotics

Backend Solutions for Connected Systems

Featured image

Python in IoT and Robotics

Pyconf Hyderabad 2025 Logo

Conference Talk Link: https://2025.pyconfhyd.org/speakers/harsh-mittal

Github Link: https://github.com/bytesByHarsh/PyConfHyd2025

Talk Slides: Drive Link

Introduction

Imagine a world where your toaster chats with your coffee machine about the perfect brew temperature, or where a fleet of drones gossip about their battery levels and GPS coordinates as they fly around. Welcome to the realm of IoT and robotics, where devices are not only smart—they’re downright social! In my recent talk, “Python in IoT & Robotics: Scalable Backend Solutions for Connected Systems,” I dove into how Python serves as the central nervous system for orchestrating these connected systems. Here’s a deeper look at the ideas and architectures we explored.

The Need for a Connected Backend

1) Data Silos

Individual robots or IoT devices operate in isolation, generating and storing data locally. This creates data silos, preventing valuable insights from being shared and utilized across the entire system.

2) Communication Issues

Direct device-to-device communication can be unreliable, leading to delays, dropped messages, and inconsistencies. This is especially true in environments with poor network connectivity.

3) Scalability Challenges

Solutions designed for single devices or small deployments often fail to scale to larger systems. Managing a large number of devices becomes increasingly complex and resource-intensive.

4) Multiple Tech Stack

Robots and IoT devices often use diverse hardware and software platforms, leading to a fragmented ecosystem. This makes integration and interoperability difficult.

Possible Solution: A Centralized Brain

Central Brain Solution

What can go wrong? Let’s understand that:

Challenge 1: High Concurrency & Message Throughput

Challenge 2: Network Latency & Reliability

Challenge 3: Data Management & Storage

Challenge 4: Device Management & Orchestration

Challenge 5: Edge Computing & Processing

Decentralized Processing

Add Local Compute in between

Local Compute Expanded

Different Communication Protocols

These are different protocols that will be used for communicating between devices or to the server.

Wireless Wired
- Wifi
- Bluetooth
- Zigbee
- LPWAN
- Ethernet
- USB
- SPI
- I2C
- RS-232, RS485
- UART

Local Communication Protocols

Local Communication Protocols

Considerations for Communication Protocol

Zenoh Vs MQTT Vs DDS Vs Kafka

Source: Zenoh Blog

Single Machine msg/sec vs payload:

Single Machine msg/sec vs payload

Multiple Machine msg/sec vs payload:

Multiple Machine msg/sec vs payload

Single Machine bits/sec vs payload:

Single Machine bits/sec vs payload

Multiple Machine bits/sec vs payload:

Multiple Machine bits/sec vs payload

Latency data in µs (microseconds)

Target Single-machine Multiple-machine
Kafka 73 81
MQTT 27 45
Cyclone DDS 8 37
Zenoh brokered 21 41
Zenoh P2P 10 16
Zenoh-pico 5 13
ping 1 7

How To Leverage Python?

MQTT Python

Publisher Code

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

def on_publish(client, userdata, mid):
    print("Message Published")

client = mqtt.Client()
client.on_connect = on_connect
client.on_publish = on_publish

client.connect("localhost", 1883, 60) # Replace with your broker address

client.loop_start()  # Start the loop in a separate thread

try:
    while True:
        message = input("Enter message to send: ")
        client.publish("my/topic", message) # Replace with your topic
        time.sleep(1) # Wait for publish confirmation
except KeyboardInterrupt:
    client.loop_stop() # Stop the loop
    client.disconnect() # Disconnect from the broker
    print("Client Disconnected")

Subscriber Code

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("my/topic") # Subscribe to the topic

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload.decode())) # Decode the payload

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("localhost", 1883, 60) # Replace with your broker address

client.loop_forever()

Zenoh

Publisher Code

import zenoh, random, time

random.seed()

def read_temp():
    return random.randint(15, 30)

if __name__ == "__main__":
    with zenoh.open(zenoh.Config()) as session:
        key = 'myhome/kitchen/temp'
        pub = session.declare_publisher(key)
        while True:
            t = read_temp()
            buf = f"{t}"
            print(f"Putting Data ('{key}': '{buf}')...")
            pub.put(buf)
            time.sleep(1)

Subscriber Code

import zenoh, time

def listener(sample):
    print(f"Received {sample.kind} ('{sample.key_expr}': '{sample.payload.to_string()}')")

if __name__ == "__main__":
    with zenoh.open(zenoh.Config()) as session:
        sub = session.declare_subscriber('myhome/kitchen/temp', listener)
        time.sleep(60)

ROS2: Zenoh

Publisher Code

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

import zenoh  # Import zenoh

class Talker(Node):

    def __init__(self):
        super().__init__('talker')
        self.publisher_ = self.create_publisher(String, 'chatter', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

        # Initialize Zenoh session (replace with your locator if needed)
        self.zenoh_session = zenoh.open()

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello, Zenoh ROS2! %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)

        self.zenoh_session.put("ros2/chatter", msg.data.encode())

        self.i += 1

    def __del__(self): # Close Zenoh session on node destruction
        self.zenoh_session.close()

rclpy.init(args=args)
talker = Talker()
rclpy.spin(talker)
talker.destroy_node()
rclpy.shutdown()

Subscriber Code

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

import zenoh

class Listener(Node):

    def __init__(self):
        super().__init__('listener')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10)

        # Initialize Zenoh session
        self.zenoh_session = zenoh.open()

        # Zenoh subscriber
        self.zenoh_subscriber = self.zenoh_session.subscribe("ros2/chatter", self.zenoh_callback)

    def listener_callback(self, msg):
        self.get_logger().info('I heard (ROS2): "%s"' % msg.data)

    def zenoh_callback(self, change): # Callback for Zenoh subscription
        if change.kind == zenoh.ChangeKind.Put:
            received_message = change.value.decode('utf-8')  # Decode from bytes
            self.get_logger().info('I heard (Zenoh): "%s"' % received_message)

    def __del__(self):
        self.zenoh_session.close()

rclpy.init(args=args)
listener = Listener()
rclpy.spin(listener)
listener.destroy_node()
rclpy.shutdown()
import pymavlink
import time

# Establish a connection to the MAVLink device (replace with your connection string)
# Examples:
#   * Serial:  'udpin:0.0.0.0:14550'  (for UDP)
#   * UDP:      '/dev/ttyS0'          (for serial port)
#   * TCP:      'tcp:192.168.1.1:5760' (for TCP)
try:
    master = pymavlink.mavutil.mavlink_connection('udpin:0.0.0.0:14550') # Replace with your connection string
except Exception as e:
    print(f"Error connecting: {e}")
    exit()

# Wait for a heartbeat message to ensure connection is established
master.wait_heartbeat()
print("Connected to MAVLink device!")

# Request data streams (adjust rates as needed)
master.mav.request_data_stream(master.target_system, master.target_component,
                                   pymavlink.mavutil.mavlink_stream_type.ALL, # Request all streams
                                   10,  # 10 Hz update rate
                                   1)   # Enable streaming

while True:
    try:
        # Receive messages (non-blocking)
        msg = master.recv_match(timeout=0.1)  # 100ms timeout

        if msg:
            if msg.get_type() == 'GLOBAL_POSITION_INT':  # GPS
                lat = msg.lat / 1e7  # Latitude in degrees
                lon = msg.lon / 1e7  # Longitude in degrees
                alt = msg.alt / 1000  # Altitude in meters
                print(f"GPS: Lat={lat}, Lon={lon}, Alt={alt}")

            elif msg.get_type() == 'IMU':  # IMU (raw data)
                xacc = msg.xacc / 1000  # Acceleration in g
                yacc = msg.yacc / 1000
                zacc = msg.zacc / 1000
                xgyro = msg.xgyro / 1000 # Angular rate in rad/s
                ygyro = msg.ygyro / 1000
                zgyro = msg.zgyro / 1000
                print(f"IMU (Raw): Accel=({xacc},{yacc},{zacc})g, Gyro=({xgyro},{ygyro},{zgyro}) rad/s")

            elif msg.get_type() == 'ATTITUDE': # Attitude (Euler angles)
                roll = msg.roll
                pitch = msg.pitch
                yaw = msg.yaw
                print(f"Attitude: Roll={roll}, Pitch={pitch}, Yaw={yaw}")

            # ... other message types as needed (e.g., VFR_HUD, SYS_STATUS)

    except Exception as e:
        print(f"Error receiving or processing message: {e}")
        break  # Exit the loop on error

    time.sleep(0.01)  # Small delay (adjust as needed)

# Close the connection when done (important!)
master.close()
print("Connection closed.")

ROS2 Python Launch File

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
    connection_string_arg = DeclareLaunchArgument(
        'connection_string',
        default_value='udpin:0.0.0.0:14550',  # Default connection string
        description='Connection string for MAVLink device'
    )

    zenoh_router_node = Node(
        package='zenoh',
        executable='zenoh-router',
        name='zenoh_router',
        output='screen'
    )

    pymavlink_node = Node(
        package='your_pymavlink_package',  # Replace with your package name
        executable='pymavlink_script.py',  # Replace with your script name
        name='pymavlink_node',
        output='screen',
        parameters=[{
            'connection_string': LaunchConfiguration('connection_string')
        }]
    )

    talker_node = Node(
        package='your_ros2_package',  # Replace with your package name
        executable='talker.py',  # Replace with your script name
        name='talker_node',
        output='screen'
    )

    listener_node = Node(
        package='your_ros2_package',  # Replace with your package name
        executable='listener.py',  # Replace with your script name
        name='listener_node',
        output='screen'
    )

    return LaunchDescription([
        connection_string_arg,
        zenoh_router_node,
        pymavlink_node,
        talker_node,
        listener_node,
    ])

Home Assistance

Awaken your home

Open source home automation that puts local control and privacy first. Powered by a worldwide community of tinkerers and DIY enthusiasts. Perfect to run on a Raspberry Pi or a local server.

Website: https://www.home-assistant.io/

Github: https://github.com/home-assistant/

Server Side Logic

IOT Platforms Vendors

AWS vs Azure vs Google Cloud for IOT Services

AWS IoT in action

AWS IoT in action

Let’s Make One! But Simpler !!!

Event-Driven Architecture

Workflow using AWS

Drone -> Server

import json
import time
import paho.mqtt.client as mqtt
from pymavlink import mavutil

# MQTT settings
BROKER_ADDRESS = "your.mqtt.broker.address"
BROKER_PORT = 1883
USERNAME = "your_username"
PASSWORD = "your_password"
DRONE_ID = "drone001"  # Unique drone identifier
TOPIC = f"drone/{DRONE_ID}/updates"

# Initialize MQTT client with authentication
client = mqtt.Client("mavlink_publisher")
client.username_pw_set(USERNAME, PASSWORD)
client.connect(BROKER_ADDRESS, BROKER_PORT)

# Connect to MAVLink via Serial (adjust the serial port and baud rate as needed)
SERIAL_PORT = "/dev/ttyUSB0"   # e.g., "COM3" for Windows, "/dev/ttyUSB0" for Linux
BAUD_RATE = 57600

mavlink_connection = mavutil.mavlink_connection(SERIAL_PORT, baud=BAUD_RATE)
mavlink_connection.wait_heartbeat()
print("Heartbeat received from the drone via serial connection")

# Initialize drone data including the drone id
drone_data = {
    "drone_id": DRONE_ID,
    "battery": None,
    "gps": {"lat": None, "lon": None}
}

while True:
    msg = mavlink_connection.recv_match(blocking=True, timeout=1)
    if not msg:
        continue

    msg_type = msg.get_type()

    if msg_type == "BATTERY_STATUS":
        drone_data["battery"] = msg.battery_remaining
        print("Updated battery:", drone_data["battery"])

    elif msg_type == "GLOBAL_POSITION_INT":
        drone_data["gps"]["lat"] = msg.lat / 1e7
        drone_data["gps"]["lon"] = msg.lon / 1e7
        print("Updated GPS:", drone_data["gps"])

        # Publish the updated data as a JSON payload
        payload = json.dumps(drone_data)
        client.publish(TOPIC, payload)
        print("Published to topic", TOPIC, ":", payload)

    time.sleep(0.1)

Bridge – Forward MQTT Messages to Redis

import redis
import paho.mqtt.client as mqtt

# MQTT settings
BROKER_ADDRESS = "your.mqtt.broker.address"
BROKER_PORT = 1883
MQTT_TOPIC = "drone/+/updates"  # Wildcard to subscribe to all drone topics
USERNAME = "your_username"
PASSWORD = "your_password"

REDIS_HOST = "localhost"
REDIS_CHANNEL = "_updates"

# Connect to Redis
r = redis.Redis(host=REDIS_HOST, port=6379, db=0)

def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT Broker")
    client.subscribe(MQTT_TOPIC)

def on_message(client, userdata, msg):
    payload = msg.payload.decode()
    print("Received MQTT message on topic", msg.topic, ":", payload)
    # Publish the message to Redis
    drone_str = msg.topic.split("/")[1]
    r.publish(f"{drone_str}{REDIS_CHANNEL}", payload)

mqtt_client = mqtt.Client("mqtt_redis_bridge")
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.username_pw_set(USERNAME, PASSWORD)
mqtt_client.connect(BROKER_ADDRESS, BROKER_PORT)
mqtt_client.loop_forever()

FastAPI – WebSocket Server Subscribing to Redis

import json
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as aioredis

app = FastAPI()

REDIS_HOST = "localhost"
REDIS_PORT = 6379

# Create an async Redis connection
redis_client = aioredis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

@app.get("/channels")
async def get_channels():
    """
    GET endpoint to fetch all active drone channels.
    """
    channels = await redis_client.execute_command("PUBSUB", "CHANNELS", "drone*_updates")
    return {"available_channels": channels}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        # Wait for client to send the desired channel name.
        selected_channel = await websocket.receive_text()
        print("Client selected channel:", selected_channel)

        # Create a Redis pubsub subscription to the selected channel.
        pubsub = redis_client.pubsub()
        await pubsub.subscribe(selected_channel)

        try:
            while True:
                # Poll for new messages with a short timeout.
                message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                if message:
                    data = message["data"]
                    await websocket.send_text(data)
                await asyncio.sleep(0.01)
        except Exception as e:
            print("Error in Redis listener:", e)
        finally:
            await pubsub.unsubscribe(selected_channel)
            await pubsub.close()
    except WebSocketDisconnect:
        print("WebSocket disconnected")

Distributed Databases

Use a combination of:

Microservices & Kubernetes

Monolithic backends fail when robots scale.

Solution:

Summary

Final Proposed architecture

Thanks for reading meme