caching-strategies

Backend Caching Strategies

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "caching-strategies" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-caching-strategies

Backend Caching Strategies

Optimize performance with Redis caching patterns and smart invalidation.

Pattern Selection

Pattern Write Read Consistency Use Case

Cache-Aside DB first Cache → DB Eventual General purpose

Write-Through Cache + DB Cache Strong Critical data

Write-Behind Cache, async DB Cache Eventual High write load

Read-Through Cache handles Cache → DB Eventual Simplified reads

Cache-Aside (Lazy Loading)

import redis.asyncio as redis from typing import TypeVar, Callable import json

T = TypeVar("T")

class CacheAside: def init(self, redis_client: redis.Redis, default_ttl: int = 3600): self.redis = redis_client self.ttl = default_ttl

async def get_or_set(
    self,
    key: str,
    fetch_fn: Callable[[], T],
    ttl: int | None = None,
    serialize: Callable[[T], str] = json.dumps,
    deserialize: Callable[[str], T] = json.loads,
) -> T:
    """Get from cache, or fetch and cache."""
    # Try cache first
    cached = await self.redis.get(key)
    if cached:
        return deserialize(cached)

    # Cache miss - fetch from source
    value = await fetch_fn()

    # Store in cache
    await self.redis.setex(
        key,
        ttl or self.ttl,
        serialize(value),
    )
    return value

Usage

cache = CacheAside(redis_client)

async def get_analysis(analysis_id: str) -> Analysis: return await cache.get_or_set( key=f"analysis:{analysis_id}", fetch_fn=lambda: repo.get_by_id(analysis_id), ttl=1800, # 30 minutes )

Write-Through Cache

class WriteThroughCache: def init(self, redis_client: redis.Redis, ttl: int = 3600): self.redis = redis_client self.ttl = ttl

async def write(
    self,
    key: str,
    value: T,
    db_write_fn: Callable[[T], Awaitable[T]],
) -> T:
    """Write to both cache and database synchronously."""
    # Write to database first (consistency)
    result = await db_write_fn(value)

    # Then update cache
    await self.redis.setex(key, self.ttl, json.dumps(result))

    return result

async def read(self, key: str) -> T | None:
    """Read from cache only."""
    cached = await self.redis.get(key)
    return json.loads(cached) if cached else None

Usage

cache = WriteThroughCache(redis_client)

async def update_analysis(analysis_id: str, data: AnalysisUpdate) -> Analysis: return await cache.write( key=f"analysis:{analysis_id}", value=data, db_write_fn=lambda d: repo.update(analysis_id, d), )

Write-Behind (Write-Back)

import asyncio from collections import deque

class WriteBehindCache: def init( self, redis_client: redis.Redis, flush_interval: float = 5.0, batch_size: int = 100, ): self.redis = redis_client self.flush_interval = flush_interval self.batch_size = batch_size self._pending_writes: deque = deque() self._flush_task: asyncio.Task | None = None

async def start(self):
    """Start background flush task."""
    self._flush_task = asyncio.create_task(self._flush_loop())

async def stop(self):
    """Stop and flush remaining writes."""
    if self._flush_task:
        self._flush_task.cancel()
    await self._flush_pending()

async def write(self, key: str, value: T) -> None:
    """Write to cache immediately, queue for DB."""
    await self.redis.set(key, json.dumps(value))
    self._pending_writes.append((key, value))

    if len(self._pending_writes) >= self.batch_size:
        await self._flush_pending()

async def _flush_loop(self):
    while True:
        await asyncio.sleep(self.flush_interval)
        await self._flush_pending()

async def _flush_pending(self):
    if not self._pending_writes:
        return

    batch = []
    while self._pending_writes and len(batch) < self.batch_size:
        batch.append(self._pending_writes.popleft())

    # Bulk write to database
    await repo.bulk_upsert([v for _, v in batch])

Cache Invalidation Patterns

TTL-Based (Time to Live)

Simple TTL

await redis.setex("analysis:123", 3600, data) # 1 hour

TTL with jitter (prevent stampede)

import random base_ttl = 3600 jitter = random.randint(-300, 300) # ±5 minutes await redis.setex("analysis:123", base_ttl + jitter, data)

Event-Based Invalidation

class CacheInvalidator: def init(self, redis_client: redis.Redis): self.redis = redis_client

async def invalidate(self, key: str) -> None:
    """Delete single key."""
    await self.redis.delete(key)

async def invalidate_pattern(self, pattern: str) -> int:
    """Delete keys matching pattern."""
    keys = []
    async for key in self.redis.scan_iter(match=pattern):
        keys.append(key)

    if keys:
        return await self.redis.delete(*keys)
    return 0

async def invalidate_tags(self, *tags: str) -> int:
    """Invalidate all keys with given tags."""
    count = 0
    for tag in tags:
        tag_key = f"tag:{tag}"
        members = await self.redis.smembers(tag_key)
        if members:
            count += await self.redis.delete(*members)
        await self.redis.delete(tag_key)
    return count

Usage with tags

async def cache_with_tags(key: str, value: T, tags: list[str]): await redis.set(key, json.dumps(value)) for tag in tags: await redis.sadd(f"tag:{tag}", key)

Invalidate by tag

await invalidator.invalidate_tags("user:123", "analyses")

Version-Based Invalidation

class VersionedCache: def init(self, redis_client: redis.Redis): self.redis = redis_client

async def get_version(self, namespace: str) -> int:
    version = await self.redis.get(f"version:{namespace}")
    return int(version) if version else 1

async def increment_version(self, namespace: str) -> int:
    return await self.redis.incr(f"version:{namespace}")

def make_key(self, namespace: str, key: str, version: int) -> str:
    return f"{namespace}:v{version}:{key}"

async def get(self, namespace: str, key: str) -> T | None:
    version = await self.get_version(namespace)
    full_key = self.make_key(namespace, key, version)
    cached = await self.redis.get(full_key)
    return json.loads(cached) if cached else None

async def invalidate_namespace(self, namespace: str) -> None:
    """Increment version to invalidate all keys."""
    await self.increment_version(namespace)

Cache Stampede Prevention

import asyncio from contextlib import asynccontextmanager

class StampedeProtection: def init(self, redis_client: redis.Redis): self.redis = redis_client self._local_locks: dict[str, asyncio.Lock] = {}

@asynccontextmanager
async def lock(self, key: str, timeout: int = 10):
    """Distributed lock to prevent stampede."""
    lock_key = f"lock:{key}"

    # Try to acquire distributed lock
    acquired = await self.redis.set(
        lock_key, "1", nx=True, ex=timeout
    )

    if not acquired:
        # Wait for existing computation
        for _ in range(timeout * 10):
            if await self.redis.exists(key):
                return  # Data available
            await asyncio.sleep(0.1)
        raise TimeoutError(f"Lock timeout for {key}")

    try:
        yield
    finally:
        await self.redis.delete(lock_key)

Usage

async def get_expensive_data(key: str) -> Data: cached = await redis.get(key) if cached: return json.loads(cached)

async with stampede.lock(key):
    # Double-check after acquiring lock
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)

    # Compute expensive data
    data = await compute_expensive_data()
    await redis.setex(key, 3600, json.dumps(data))
    return data

Anti-Patterns (FORBIDDEN)

NEVER cache without TTL (memory leak)

await redis.set("key", value) # No expiration!

NEVER cache sensitive data without encryption

await redis.set("user:123:password", password)

NEVER use cache as primary storage

await redis.set("order:123", order_data)

... database write fails, data lost!

NEVER ignore cache failures

try: await redis.get(key) except: pass # Silent failure = stale data

Key Decisions

Decision Recommendation

Default TTL 1 hour for most data, 5 min for volatile

Serialization orjson for performance

Key naming {entity}:{id} or {entity}:{id}:{field}

Stampede Use locks for expensive computations

Invalidation Event-based for writes, TTL for reads

Related Skills

  • redis-patterns

  • Advanced Redis usage

  • resilience-patterns

  • Fallback strategies

  • observability-monitoring

  • Cache hit metrics

Capability Details

cache-aside

Keywords: cache aside, lazy loading, cache miss, get or set Solves:

  • How to implement lazy loading cache?

  • Cache on read pattern

write-through

Keywords: write through, cache consistency, synchronous cache Solves:

  • How to keep cache consistent with database?

  • Strong consistency caching

write-behind

Keywords: write behind, write back, async cache, batch writes Solves:

  • High write throughput caching

  • Async database writes

cache-invalidation

Keywords: invalidation, cache bust, TTL, cache tags Solves:

  • How to invalidate cache?

  • When to expire cached data

stampede-prevention

Keywords: stampede, thundering herd, cache lock, singleflight Solves:

  • Prevent cache stampede

  • Multiple requests hitting DB

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review