iot-expert

Expert guidance for IoT systems, embedded devices, edge computing, sensor networks, and IoT protocols.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "iot-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-iot-expert

IoT Expert

Expert guidance for IoT systems, embedded devices, edge computing, sensor networks, and IoT protocols.

Core Concepts

IoT Architecture

  • Device layer (sensors, actuators)

  • Edge computing layer

  • Network layer (connectivity)

  • Cloud/platform layer

  • Application layer

  • Security across all layers

IoT Protocols

  • MQTT (Message Queuing Telemetry Transport)

  • CoAP (Constrained Application Protocol)

  • HTTP/REST for IoT

  • WebSocket for real-time

  • LoRaWAN for long-range

  • Zigbee, Z-Wave for home automation

Embedded Systems

  • Microcontroller programming

  • Real-time operating systems (RTOS)

  • Power management

  • Firmware updates (OTA)

  • Hardware interfaces (I2C, SPI, UART)

  • Memory constraints

MQTT Implementation

import paho.mqtt.client as mqtt import json from datetime import datetime from typing import Callable, Dict

class MQTTClient: def init(self, broker: str, port: int = 1883, client_id: str = "iot_device"): self.broker = broker self.port = port self.client = mqtt.Client(client_id) self.subscriptions: Dict[str, Callable] = {}

    self.client.on_connect = self._on_connect
    self.client.on_message = self._on_message
    self.client.on_disconnect = self._on_disconnect

def _on_connect(self, client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to MQTT broker at {self.broker}:{self.port}")
        # Resubscribe to topics on reconnect
        for topic in self.subscriptions.keys():
            self.client.subscribe(topic)
    else:
        print(f"Connection failed with code {rc}")

def _on_message(self, client, userdata, msg):
    topic = msg.topic
    payload = msg.payload.decode()

    if topic in self.subscriptions:
        try:
            data = json.loads(payload)
            self.subscriptions[topic](data)
        except json.JSONDecodeError:
            self.subscriptions[topic](payload)

def _on_disconnect(self, client, userdata, rc):
    if rc != 0:
        print(f"Unexpected disconnect. Reconnecting...")

def connect(self, username: str = None, password: str = None):
    if username and password:
        self.client.username_pw_set(username, password)

    self.client.connect(self.broker, self.port, 60)
    self.client.loop_start()

def publish(self, topic: str, payload: Dict, qos: int = 1, retain: bool = False):
    """Publish message to MQTT topic"""
    message = json.dumps(payload)
    result = self.client.publish(topic, message, qos=qos, retain=retain)
    return result.rc == mqtt.MQTT_ERR_SUCCESS

def subscribe(self, topic: str, callback: Callable, qos: int = 1):
    """Subscribe to MQTT topic with callback"""
    self.subscriptions[topic] = callback
    self.client.subscribe(topic, qos=qos)

def disconnect(self):
    self.client.loop_stop()
    self.client.disconnect()

IoT Device Example

class TemperatureSensor: def init(self, device_id: str, mqtt_client: MQTTClient): self.device_id = device_id self.mqtt = mqtt_client self.topic = f"sensors/temperature/{device_id}"

def read_temperature(self) -> float:
    # In real device, read from actual sensor
    import random
    return round(random.uniform(20.0, 30.0), 2)

def publish_reading(self):
    temperature = self.read_temperature()

    payload = {
        "device_id": self.device_id,
        "temperature": temperature,
        "unit": "celsius",
        "timestamp": datetime.utcnow().isoformat()
    }

    self.mqtt.publish(self.topic, payload)
    return payload

Embedded C for Microcontroller

// Arduino/ESP32 Example #include <WiFi.h> #include <PubSubClient.h> #include "DHT.h"

#define DHTPIN 4 #define DHTTYPE DHT22

const char* ssid = "YourWiFiSSID"; const char* password = "YourPassword"; const char* mqtt_server = "broker.example.com";

WiFiClient espClient; PubSubClient client(espClient); DHT dht(DHTPIN, DHTTYPE);

void setup_wifi() { delay(10); Serial.println("Connecting to WiFi...");

WiFi.begin(ssid, password);

while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
}

Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());

}

void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] ");

for (int i = 0; i &#x3C; length; i++) {
    Serial.print((char)payload[i]);
}
Serial.println();

}

void reconnect() { while (!client.connected()) { Serial.print("Attempting MQTT connection...");

    if (client.connect("ESP32Client")) {
        Serial.println("connected");
        client.subscribe("device/control");
    } else {
        Serial.print("failed, rc=");
        Serial.print(client.state());
        Serial.println(" try again in 5 seconds");
        delay(5000);
    }
}

}

void setup() { Serial.begin(115200); setup_wifi(); client.setServer(mqtt_server, 1883); client.setCallback(callback); dht.begin(); }

void loop() { if (!client.connected()) { reconnect(); } client.loop();

// Read sensor every 10 seconds
static unsigned long lastRead = 0;
if (millis() - lastRead > 10000) {
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();

    if (!isnan(humidity) &#x26;&#x26; !isnan(temperature)) {
        char msg[100];
        snprintf(msg, sizeof(msg),
                "{\"temperature\":%.2f,\"humidity\":%.2f}",
                temperature, humidity);

        client.publish("sensors/data", msg);
        Serial.println(msg);
    }

    lastRead = millis();
}

}

Edge Computing

import asyncio from typing import Dict, List import numpy as np

class EdgeProcessor: """Process data at edge before sending to cloud"""

def __init__(self, buffer_size: int = 100):
    self.buffer: List[Dict] = []
    self.buffer_size = buffer_size

def add_reading(self, reading: Dict):
    """Add sensor reading to buffer"""
    self.buffer.append(reading)

    if len(self.buffer) >= self.buffer_size:
        self.process_buffer()

def process_buffer(self) -> Dict:
    """Process buffered data at edge"""
    if not self.buffer:
        return {}

    # Extract temperature values
    temperatures = [r['temperature'] for r in self.buffer]

    # Compute statistics at edge
    summary = {
        "count": len(temperatures),
        "mean": np.mean(temperatures),
        "std": np.std(temperatures),
        "min": np.min(temperatures),
        "max": np.max(temperatures),
        "anomalies": self.detect_anomalies(temperatures)
    }

    # Clear buffer
    self.buffer = []

    return summary

def detect_anomalies(self, values: List[float]) -> List[int]:
    """Detect anomalies using simple threshold"""
    mean = np.mean(values)
    std = np.std(values)
    threshold = 2.5

    anomalies = []
    for i, v in enumerate(values):
        if abs(v - mean) > threshold * std:
            anomalies.append(i)

    return anomalies

class IoTPipeline: """Complete IoT data pipeline"""

def __init__(self, mqtt_client: MQTTClient):
    self.mqtt = mqtt_client
    self.edge_processor = EdgeProcessor()
    self.devices: Dict[str, TemperatureSensor] = {}

def register_device(self, device: TemperatureSensor):
    """Register IoT device"""
    self.devices[device.device_id] = device

    # Subscribe to device topic
    topic = f"sensors/temperature/{device.device_id}"
    self.mqtt.subscribe(topic, self.handle_device_data)

def handle_device_data(self, data: Dict):
    """Handle incoming device data"""
    # Process at edge
    self.edge_processor.add_reading(data)

async def collect_data_loop(self, interval: int = 5):
    """Continuous data collection from devices"""
    while True:
        for device in self.devices.values():
            reading = device.publish_reading()
            print(f"Device {device.device_id}: {reading['temperature']}°C")

        await asyncio.sleep(interval)

Device Management

from datetime import datetime from enum import Enum

class DeviceStatus(Enum): ONLINE = "online" OFFLINE = "offline" MAINTENANCE = "maintenance" ERROR = "error"

class IoTDevice: def init(self, device_id: str, device_type: str): self.device_id = device_id self.device_type = device_type self.status = DeviceStatus.OFFLINE self.last_seen = None self.firmware_version = "1.0.0" self.metadata = {}

def update_status(self, status: DeviceStatus):
    self.status = status
    self.last_seen = datetime.utcnow()

def needs_firmware_update(self, latest_version: str) -> bool:
    return self.firmware_version &#x3C; latest_version

class DeviceManager: def init(self): self.devices: Dict[str, IoTDevice] = {}

def register_device(self, device: IoTDevice):
    """Register new device"""
    self.devices[device.device_id] = device

def update_device_heartbeat(self, device_id: str):
    """Update device last seen timestamp"""
    if device_id in self.devices:
        self.devices[device_id].update_status(DeviceStatus.ONLINE)

def get_offline_devices(self, timeout_seconds: int = 300) -> List[IoTDevice]:
    """Get devices that haven't reported recently"""
    offline = []
    now = datetime.utcnow()

    for device in self.devices.values():
        if device.last_seen:
            elapsed = (now - device.last_seen).total_seconds()
            if elapsed > timeout_seconds:
                offline.append(device)

    return offline

def schedule_firmware_update(self, device_id: str, new_version: str):
    """Schedule OTA firmware update"""
    if device_id in self.devices:
        # Send update command via MQTT
        payload = {
            "command": "firmware_update",
            "version": new_version,
            "url": f"https://updates.example.com/{new_version}.bin"
        }
        return payload

Best Practices

Device Design

  • Implement power management for battery devices

  • Use deep sleep modes when idle

  • Handle network disconnections gracefully

  • Implement watchdog timers

  • Design for remote diagnostics

  • Plan for firmware updates (OTA)

Security

  • Use TLS/SSL for MQTT connections

  • Implement device authentication

  • Encrypt sensitive data

  • Secure firmware updates

  • Regular security patches

  • Network segmentation

Data Management

  • Process data at edge when possible

  • Implement data buffering for offline scenarios

  • Use efficient data formats (e.g., Protocol Buffers)

  • Compress data before transmission

  • Handle time synchronization

  • Implement data retention policies

Anti-Patterns

❌ No power management strategy ❌ Unencrypted communications ❌ No error handling for network failures ❌ Sending all raw data to cloud ❌ No device authentication ❌ Hard-coded credentials in firmware ❌ No OTA update mechanism

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python-expert

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

devops-expert

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

code-review-expert

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

typescript-expert

No summary provided by upstream source.

Repository SourceNeeds Review