nosql-databases

MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "nosql-databases" with this command: npx skills add pluginagentmarketplace/custom-plugin-data-engineer/pluginagentmarketplace-custom-plugin-data-engineer-nosql-databases

NoSQL Databases

Production-grade NoSQL database patterns with MongoDB, Redis, Cassandra, and DynamoDB.

Quick Start

# MongoDB with PyMongo
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from datetime import datetime

client = MongoClient("mongodb://localhost:27017/")
db = client.analytics
events = db.events

# Create index for query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
events.create_index([("event_type", 1)])

# Insert with retry pattern
def insert_event(event: dict, retries: int = 3):
    event["_id"] = f"{event['user_id']}_{event['timestamp'].isoformat()}"
    event["created_at"] = datetime.utcnow()

    for attempt in range(retries):
        try:
            events.insert_one(event)
            return True
        except DuplicateKeyError:
            return False  # Already exists
        except Exception as e:
            if attempt == retries - 1:
                raise
    return False

# Aggregation pipeline
pipeline = [
    {"$match": {"event_type": "purchase", "timestamp": {"$gte": datetime(2024, 1, 1)}}},
    {"$group": {"_id": "$user_id", "total_purchases": {"$sum": "$amount"}, "count": {"$sum": 1}}},
    {"$sort": {"total_purchases": -1}},
    {"$limit": 100}
]
top_customers = list(events.aggregate(pipeline))

Core Concepts

1. Redis for Caching & Real-time

import redis
import json
from datetime import timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Cache pattern with TTL
def get_user_profile(user_id: str) -> dict:
    cache_key = f"user:{user_id}:profile"

    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss - fetch from DB
    profile = fetch_from_database(user_id)

    # Set with 1 hour TTL
    r.setex(cache_key, timedelta(hours=1), json.dumps(profile))
    return profile

# Rate limiting
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"rate:{user_id}:{int(time.time()) // window}"
    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current <= limit

# Real-time leaderboard with sorted sets
def update_leaderboard(user_id: str, score: float):
    r.zadd("leaderboard:daily", {user_id: score})

def get_top_users(n: int = 10) -> list:
    return r.zrevrange("leaderboard:daily", 0, n-1, withscores=True)

# Pub/Sub for event streaming
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

def subscribe_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield json.loads(message['data'])

2. DynamoDB Patterns

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Events')

# Single table design pattern
def put_event(event: dict):
    item = {
        'PK': f"USER#{event['user_id']}",
        'SK': f"EVENT#{event['timestamp']}#{event['event_id']}",
        'GSI1PK': f"TYPE#{event['event_type']}",
        'GSI1SK': f"DATE#{event['timestamp'][:10]}",
        'data': event
    }
    table.put_item(Item=item)

# Query by user
def get_user_events(user_id: str, limit: int = 100):
    response = table.query(
        KeyConditionExpression=Key('PK').eq(f"USER#{user_id}") & Key('SK').begins_with("EVENT#"),
        ScanIndexForward=False,
        Limit=limit
    )
    return response['Items']

# Query by event type (using GSI)
def get_events_by_type(event_type: str, date: str):
    response = table.query(
        IndexName='GSI1',
        KeyConditionExpression=Key('GSI1PK').eq(f"TYPE#{event_type}") & Key('GSI1SK').eq(f"DATE#{date}")
    )
    return response['Items']

# Batch write with exponential backoff
def batch_write_events(events: list):
    with table.batch_writer() as batch:
        for event in events:
            batch.put_item(Item=event)

3. Cassandra for Time Series

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime

cluster = Cluster(['node1', 'node2', 'node3'])
session = cluster.connect('analytics')

# Create table with time-based partitioning
session.execute("""
    CREATE TABLE IF NOT EXISTS events_by_day (
        date date,
        user_id uuid,
        event_time timestamp,
        event_type text,
        data text,
        PRIMARY KEY ((date), event_time, user_id)
    ) WITH CLUSTERING ORDER BY (event_time DESC)
""")

# Insert with prepared statement
insert_stmt = session.prepare("""
    INSERT INTO events_by_day (date, user_id, event_time, event_type, data)
    VALUES (?, ?, ?, ?, ?)
""")

def insert_event(event: dict):
    session.execute(insert_stmt, [
        event['timestamp'].date(),
        event['user_id'],
        event['timestamp'],
        event['event_type'],
        json.dumps(event['data'])
    ])

# Query by date range
def get_events_for_date(date: datetime.date):
    rows = session.execute(
        "SELECT * FROM events_by_day WHERE date = %s",
        [date]
    )
    return list(rows)

Tools & Technologies

ToolPurposeVersion (2025)
MongoDBDocument store7.0+
RedisCache, pub/sub7.2+
CassandraTime series, wide column5.0+
DynamoDBManaged key-valueLatest
ElasticsearchSearch, analytics8.12+
ScyllaDBHigh-perf Cassandra5.4+

Troubleshooting Guide

IssueSymptomsRoot CauseFix
Hot PartitionHigh latency on some keysUneven partition keyRedesign partition key
Memory PressureRedis evictions, slow queriesData > memoryEviction policy, clustering
Query TimeoutSlow reads in CassandraMissing index, large partitionAdd index, limit partition size
Consistency IssuesStale readsEventual consistencyUse appropriate consistency level

Best Practices

# ✅ DO: Design for access patterns (NoSQL)
# Primary key = partition key + sort key

# ✅ DO: Use connection pooling
pool = redis.ConnectionPool(max_connections=20)
r = redis.Redis(connection_pool=pool)

# ✅ DO: Set TTLs on cache data
r.setex(key, ttl_seconds, value)

# ✅ DO: Handle eventual consistency
# Read-your-writes with consistent reads where needed

# ❌ DON'T: Use NoSQL for complex joins
# ❌ DON'T: Store unbounded data in single document
# ❌ DON'T: Ignore partition sizing

Resources


Skill Certification Checklist:

  • Can design document schemas for MongoDB
  • Can implement caching patterns with Redis
  • Can model time series data in Cassandra
  • Can use DynamoDB single-table design
  • Can choose appropriate consistency levels

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

statistics-math

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

deep-learning

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-engineering

No summary provided by upstream source.

Repository SourceNeeds Review