snapshot-aggregation

Daily compression with merge logic and storage estimation for time-series data.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "snapshot-aggregation" with this command: npx skills add dadbodgeoff/drift/dadbodgeoff-drift-snapshot-aggregation

Snapshot Aggregation

Daily compression with merge logic and storage estimation for time-series data.

When to Use This Skill

  • Raw event data grows too fast for direct queries

  • Need daily snapshots for historical dashboards

  • Pipeline runs multiple times per day and needs merge logic

  • Want storage estimation for capacity planning

Core Concepts

Raw event data grows fast. Daily snapshots provide:

  • Historical dashboards without querying millions of rows

  • 90-day retention with minimal storage (~2-3KB/day)

  • Multiple pipeline runs per day that merge correctly

  • Storage estimation for capacity planning

Key insight: Multiple runs per day must merge correctly, not overwrite.

Implementation

TypeScript

interface DailySnapshot { snapshotDate: string; // YYYY-MM-DD totalArticles: number; totalEvents: number; totalClusters: number; avgRiskScore: number; maxRiskScore: number; categoryTotals: Record<string, number>; topHotspots: Hotspot[]; // Top 10 by risk keyEvents: KeyEvent[]; // Top 20 by risk countryStats: Record<string, CountryStats>; riskTrend: number; // vs previous day pipelineRuns: number; // How many runs merged createdAt: string; updatedAt: string; }

interface Hotspot { country: string; countryCode: string; lat: number; lon: number; riskScore: number; eventCount: number; summary: string; }

interface KeyEvent { id: string; title: string; country: string; riskScore: number; timestamp: string; }

interface CountryStats { events: number; avgRisk: number; maxRisk: number; topCategory: string; }

/**

  • Aggregate pipeline results into a daily snapshot */ function aggregateToSnapshot( result: PipelineResult, rawEvents: RawEvent[] = [], date: string = new Date().toISOString().split('T')[0] ): DailySnapshot { const { predictions, stats } = result;

// Calculate category totals const categoryTotals: Record<string, number> = {}; for (const pred of predictions) { if (pred.categoryCounts) { for (const [cat, count] of Object.entries(pred.categoryCounts)) { categoryTotals[cat] = (categoryTotals[cat] || 0) + count; } } }

// Extract top 10 hotspots const topHotspots: Hotspot[] = predictions .slice(0, 10) .map(p => ({ country: getCountryName(p.countryCode), countryCode: p.countryCode, lat: p.lat, lon: p.lon, riskScore: p.riskScore, eventCount: p.eventCount, summary: p.summary, }));

// Extract key events (top 20 by risk) const keyEvents: KeyEvent[] = rawEvents .sort((a, b) => b.riskScore - a.riskScore) .slice(0, 20) .map(e => ({ id: e.id, title: truncate(e.title, 120), country: e.country, riskScore: e.riskScore, timestamp: new Date().toISOString(), }));

// Build country stats const countryStats: Record<string, CountryStats> = {}; for (const pred of predictions) { const country = pred.countryCode; if (!countryStats[country]) { countryStats[country] = { events: 0, avgRisk: 0, maxRisk: 0, topCategory: pred.category, }; } countryStats[country].events += pred.eventCount; countryStats[country].maxRisk = Math.max( countryStats[country].maxRisk, pred.riskScore ); }

// Aggregate stats const totalEvents = predictions.reduce((sum, p) => sum + p.eventCount, 0); const avgRiskScore = predictions.length > 0 ? Math.round(predictions.reduce((sum, p) => sum + p.riskScore, 0) / predictions.length) : 0; const maxRiskScore = predictions.length > 0 ? Math.max(...predictions.map(p => p.riskScore)) : 0;

return { snapshotDate: date, totalArticles: stats.totalFetched, totalEvents, totalClusters: stats.clustersFormed, avgRiskScore, maxRiskScore, categoryTotals, topHotspots, keyEvents, countryStats, riskTrend: 0, pipelineRuns: 1, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }; }

/**

  • Merge multiple snapshots from the same day */ function mergeSnapshots( existing: DailySnapshot, incoming: DailySnapshot ): DailySnapshot { // Merge category totals (take max - later run has more complete data) const categoryTotals = { ...existing.categoryTotals }; for (const [cat, count] of Object.entries(incoming.categoryTotals)) { categoryTotals[cat] = Math.max(categoryTotals[cat] || 0, count); }

// Merge hotspots (keep top 10 by risk, dedupe by location) const allHotspots = [...existing.topHotspots, ...incoming.topHotspots]; const uniqueHotspots = dedupeByKey( allHotspots, h => ${h.lat.toFixed(1)},${h.lon.toFixed(1)} ); const topHotspots = uniqueHotspots .sort((a, b) => b.riskScore - a.riskScore) .slice(0, 10);

// Merge key events (keep top 20 by risk, dedupe by ID) const allEvents = [...existing.keyEvents, ...incoming.keyEvents]; const uniqueEvents = dedupeByKey(allEvents, e => e.id); const keyEvents = uniqueEvents .sort((a, b) => b.riskScore - a.riskScore) .slice(0, 20);

// Merge country stats (take max values) const countryStats = { ...existing.countryStats }; for (const [country, stats] of Object.entries(incoming.countryStats)) { if (!countryStats[country]) { countryStats[country] = stats; } else { countryStats[country] = { events: Math.max(countryStats[country].events, stats.events), avgRisk: Math.round( (countryStats[country].avgRisk + stats.avgRisk) / 2 ), maxRisk: Math.max(countryStats[country].maxRisk, stats.maxRisk), topCategory: stats.maxRisk > countryStats[country].maxRisk ? stats.topCategory : countryStats[country].topCategory, }; } }

return { ...existing, totalArticles: existing.totalArticles + incoming.totalArticles, totalEvents: Math.max(existing.totalEvents, incoming.totalEvents), totalClusters: Math.max(existing.totalClusters, incoming.totalClusters), avgRiskScore: Math.round( (existing.avgRiskScore + incoming.avgRiskScore) / 2 ), maxRiskScore: Math.max(existing.maxRiskScore, incoming.maxRiskScore), categoryTotals, topHotspots, keyEvents, countryStats, pipelineRuns: existing.pipelineRuns + 1, updatedAt: new Date().toISOString(), }; }

/**

  • Estimate storage size of a snapshot */ function estimateSnapshotSize(snapshot: DailySnapshot): number { return JSON.stringify(snapshot).length; }

// Helpers function truncate(str: string, maxLen: number): string { return str.length > maxLen ? str.slice(0, maxLen - 3) + '...' : str; }

function dedupeByKey<T>(arr: T[], keyFn: (item: T) => string): T[] { const seen = new Set<string>(); return arr.filter(item => { const key = keyFn(item); if (seen.has(key)) return false; seen.add(key); return true; }); }

Usage Examples

Save Daily Snapshot

async function saveDailySnapshot(result: PipelineResult) { const today = new Date().toISOString().split('T')[0];

const newSnapshot = aggregateToSnapshot(result, result.rawEvents, today);

// Check for existing snapshot today const { data: existing } = await supabase .from('daily_snapshots') .select('*') .eq('snapshot_date', today) .single();

if (existing) { // Merge with existing const merged = mergeSnapshots(existing, newSnapshot); await supabase .from('daily_snapshots') .update(merged) .eq('snapshot_date', today); } else { // Insert new await supabase .from('daily_snapshots') .insert(newSnapshot); }

console.log(Snapshot size: ${estimateSnapshotSize(newSnapshot)} bytes); }

Database Schema

CREATE TABLE daily_snapshots ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), snapshot_date DATE UNIQUE NOT NULL, total_articles INTEGER DEFAULT 0, total_events INTEGER DEFAULT 0, avg_risk_score INTEGER DEFAULT 0, max_risk_score INTEGER DEFAULT 0, category_totals JSONB DEFAULT '{}', top_hotspots JSONB DEFAULT '[]', key_events JSONB DEFAULT '[]', country_stats JSONB DEFAULT '{}', pipeline_runs INTEGER DEFAULT 1, created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() );

CREATE INDEX idx_snapshots_date ON daily_snapshots(snapshot_date DESC);

Best Practices

  • Structured aggregation - Not just counts, but top-N lists and breakdowns

  • Merge logic - Multiple runs per day combine correctly

  • Deduplication - Hotspots/events dedupe by key before merging

  • Storage estimation - Track size for capacity planning

  • Retention policy - Auto-cleanup old snapshots (90 days)

Common Mistakes

  • Simple daily counts (lose detail for dashboards)

  • Overwriting on multiple runs (lose data)

  • No deduplication in merge (duplicate hotspots)

  • Unbounded arrays (memory issues)

  • No retention policy (storage grows forever)

Related Patterns

  • batch-processing - Process events before aggregation

  • geographic-clustering - Cluster events for hotspots

  • checkpoint-resume - Track which data has been aggregated

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

oauth-social-login

No summary provided by upstream source.

Repository SourceNeeds Review
General

sse-streaming

No summary provided by upstream source.

Repository SourceNeeds Review
General

multi-tenancy

No summary provided by upstream source.

Repository SourceNeeds Review
General

deduplication

No summary provided by upstream source.

Repository SourceNeeds Review