data-engineering-streaming

Streaming Data Systems

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineering-streaming" with this command: npx skills add legout/data-platform-agent-skills/legout-data-platform-agent-skills-data-engineering-streaming

Streaming Data Systems

Real-time data ingestion and stream processing with Apache Kafka, MQTT, and NATS JetStream. Covers producers, consumers, and stream processing patterns for data engineering pipelines.

Quick Comparison

Feature Apache Kafka MQTT NATS JetStream

Use Case High-throughput event streaming IoT, mobile, constrained devices Cloud-native, microservices

Throughput Millions/sec Thousands/sec Hundreds of thousands/sec

Durability Disk-based log, replayable Ephemeral (configurable) Disk-based persistence

Ordering Per-partition N/A (topic-based) Per-subject

Python Client confluent-kafka paho-mqtt nats-py

Best For Event sourcing, CDC, log aggregation Sensor data, telemetry Service-to-service messaging

When to Use Which?

  • Kafka: High-volume event streams, log aggregation, CDC, data lake ingestion

  • MQTT: IoT devices, mobile push, constrained networks

  • NATS JetStream: Microservices, request-reply, cloud-native, simpler ops than Kafka

Skill Dependencies

  • @data-engineering-core

  • Process stream data with Polars/DuckDB

  • @data-engineering-orchestration

  • Orchestrate stream processing jobs

  • @data-engineering-quality

  • Validate streaming data

  • @data-engineering-storage-lakehouse

  • Persist streams to Delta/Iceberg

Detailed Guides

Apache Kafka

Install: pip install confluent-kafka

Producer

from confluent_kafka import Producer import socket import json

def delivery_report(err, msg): if err is not None: print(f"Delivery failed: {err}") else: print(f"Delivered to {msg.topic()} [{msg.partition()}]")

conf = { 'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname(), 'acks': 'all' # Wait for all replicas }

producer = Producer(conf)

Send messages asynchronously

for i in range(100): data = {'id': i, 'event': 'user_activity', 'value': i * 10} producer.produce( topic='user_activity_events', key=str(i), value=json.dumps(data).encode('utf-8'), callback=delivery_report ) producer.poll(0) # Trigger callbacks

producer.flush() # Wait for delivery

With Schema Registry (Avro)

from confluent_kafka.serialization import StringSerializer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'}) avro_serializer = AvroSerializer(schema_registry_client, schema_str)

producer = SerializingProducer({ 'bootstrap.servers': 'localhost:9092', 'key.serializer': StringSerializer('utf_8'), 'value.serializer': avro_serializer, })

Consumer

from confluent_kafka import Consumer, KafkaError

conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my_consumer_group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, # Manual commit 'max.poll.interval.ms': 300000 }

consumer = Consumer(conf) consumer.subscribe(['user_activity_events'])

try: while True: msg = consumer.poll(timeout=1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        raise KafkaException(msg.error())

    # Process message
    data = json.loads(msg.value().decode('utf-8'))
    print(f"Received: {data}")

    # Manual commit after processing
    consumer.commit(asynchronous=False)

except KeyboardInterrupt: pass finally: consumer.close()

Stream Processing (ksqlDB pattern)

For complex transformations, use ksqlDB or Kafka Streams. REST API example:

import requests

ksql_query = { "ksql": """ SELECT id, COUNT(*) AS event_count, SUM(value) AS total_value FROM user_activity_events WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY id EMIT CHANGES """ }

response = requests.post("http://localhost:8088/query", json=ksql_query) for line in response.iter_lines(): print(line)

MQTT for IoT

Install: pip install paho-mqtt

Publisher

import paho.mqtt.client as mqtt import json import time

broker = "broker.emqx.io" topic = "iot/sensors/temperature"

client = mqtt.Client(client_id="publisher_1", protocol=mqtt.MQTTv5)

def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to broker") else: print(f"Connection failed: {rc}")

client.on_connect = on_connect client.connect(broker) client.loop_start()

for i in range(10): payload = { "sensor_id": "temp_sensor_1", "temperature": 20 + i * 0.5, "humidity": 45 + i, "timestamp": time.time() } client.publish( topic=topic, payload=json.dumps(payload), qos=1 # At-least-once delivery ) time.sleep(5)

client.loop_stop() client.disconnect()

Subscriber

def on_connect(client, userdata, flags, rc): client.subscribe(topic, qos=1)

def on_message(client, userdata, msg): payload = json.loads(msg.payload.decode("utf-8")) print(f"[{msg.topic}] {payload}")

client.on_connect = on_connect client.on_message = on_message client.connect(broker) client.loop_forever()

NATS JetStream

Install: pip install nats-py

Producer/Consumer

import asyncio import nats

async def main(): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()

# Create stream
await js.add_stream(
    name="events",
    subjects=["events.*"],
    storage="file",
    max_msgs=10000,
    max_age=3600
)

# Publish
await js.publish("events.page_loaded", b'{"page": "/home"}')

# Push consumer
sub = await js.subscribe("events.*", durable="worker-1")
async for msg in sub:
    print(f"Received: {msg.data.decode()}")
    await msg.ack()

await nc.close()

asyncio.run(main())

Work Queue Pattern

async def worker(name: str): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream() sub = await js.subscribe("jobs.*", durable="workers", queue_group="processing")

async for msg in sub:
    job_data = msg.data.decode()
    print(f"Worker {name} processing: {job_data}")
    await msg.ack()
await nc.close()

Production Patterns

Idempotent Processing

Stream processors may receive duplicates. Design idempotent consumers:

processed_ids = load_checkpoint() # From DB/Redis if msg.id in processed_ids: ack() # Skip duplicate else: process(msg) save_checkpoint(msg.id) ack()

Batch Processing

Accumulate messages before writing to reduce DB load

batch = [] while True: msg = consumer.poll(timeout=0.1) if msg: batch.append(msg.value()) if len(batch) >= BATCH_SIZE or timeout_reached: write_to_db(batch) consumer.commit() # Commit after batch write batch.clear()

Error Handling (Dead Letter Queue)

try: process(msg) except Exception as e: if is_retryable(e): nack(requeue=True) # Retry else: produce_to_dlq(msg, str(e)) # Send to dead letter queue ack()

Schema Evolution

  • Use Avro/Protobuf with Schema Registry for compatibility

  • Evolve schemas additively (new fields optional, old fields preserved)

  • Register schemas per topic

References

  • Apache Kafka Documentation

  • confluent-kafka Python Client

  • MQTT 5.0 Specification

  • NATS JetStream Documentation

  • @data-engineering-orchestration

  • Orchestrate consumers/producers as flows

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

data-science-eda

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-engineering-core

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-science-feature-engineering

No summary provided by upstream source.

Repository SourceNeeds Review