checkpoint-resume

Checkpoint & Resume Processing

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "checkpoint-resume" with this command: npx skills add dadbodgeoff/drift/dadbodgeoff-drift-checkpoint-resume

Checkpoint & Resume Processing

Exactly-once processing semantics with distributed coordination for file-based data pipelines.

When to Use This Skill

  • Processing large file batches across multiple workers

  • Need to resume after worker crashes

  • Preventing duplicate processing of the same file

  • Tracking processing progress and statistics

Core Concepts

The pattern provides:

  • Atomic file claiming (only one worker processes each file)

  • Status tracking (pending, processing, completed, failed)

  • Automatic retry with configurable limits

  • In-memory fallback for development/testing

┌──────────┐ ┌─────────────────┐ ┌──────────┐ │ Worker 1 │────▶│ Checkpoint DB │◀────│ Worker 2 │ └──────────┘ └─────────────────┘ └──────────┘ │ │ │ ▼ ▼ ▼ claim_file() atomic claims claim_file() process() status tracking process() complete() retry logic complete()

Implementation

Database Schema (PostgreSQL/Supabase)

CREATE TABLE file_checkpoints ( file_url TEXT PRIMARY KEY, file_type TEXT NOT NULL, file_timestamp TIMESTAMPTZ NOT NULL, status TEXT NOT NULL DEFAULT 'pending', records_total INTEGER DEFAULT 0, records_filtered INTEGER DEFAULT 0, records_persisted INTEGER DEFAULT 0, processing_time_ms INTEGER DEFAULT 0, error_message TEXT, retry_count INTEGER DEFAULT 0, processed_by TEXT, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW() );

CREATE INDEX idx_checkpoints_status ON file_checkpoints(status);

-- Atomic claim function CREATE OR REPLACE FUNCTION claim_file( p_file_url TEXT, p_file_type TEXT, p_file_timestamp TIMESTAMPTZ, p_worker_id TEXT ) RETURNS BOOLEAN AS $$ DECLARE v_claimed BOOLEAN := FALSE; BEGIN -- Try to insert new record INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at) VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW()) ON CONFLICT (file_url) DO NOTHING;

IF FOUND THEN v_claimed := TRUE; ELSE -- Check if we can retry a failed file UPDATE file_checkpoints SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1 WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3;

v_claimed := FOUND;

END IF;

RETURN v_claimed; END; $$ LANGUAGE plpgsql;

TypeScript

interface FileCheckpoint { fileUrl: string; fileType: string; fileTimestamp: Date; status: 'pending' | 'processing' | 'completed' | 'failed'; recordsTotal: number; recordsFiltered: number; recordsPersisted: number; processingTimeMs: number; errorMessage?: string; retryCount: number; processedBy?: string; }

interface ProcessingStats { totalRows: number; filteredRows: number; persistedRows: number; durationMs: number; }

class CheckpointManager { private workerId: string; private inMemory = new Map<string, FileCheckpoint>(); private useInMemory = false;

constructor( private getClient: () => DatabaseClient | null, workerId?: string ) { this.workerId = workerId || worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}; }

async claimFile( fileUrl: string, fileType: string, fileTimestamp: Date ): Promise<boolean> { const client = this.getClient();

if (client) {
  try {
    const result = await client.rpc('claim_file', {
      p_file_url: fileUrl,
      p_file_type: fileType,
      p_file_timestamp: fileTimestamp.toISOString(),
      p_worker_id: this.workerId,
    });
    
    if (!result.error) return result.data === true;
  } catch (e) {
    console.warn('Database unavailable, using in-memory');
  }
}

// Fallback to in-memory (single-worker mode)
this.useInMemory = true;

if (this.inMemory.has(fileUrl)) {
  const existing = this.inMemory.get(fileUrl)!;
  if (existing.status !== 'failed' || existing.retryCount >= 3) {
    return false;
  }
}

this.inMemory.set(fileUrl, {
  fileUrl,
  fileType,
  fileTimestamp,
  status: 'processing',
  recordsTotal: 0,
  recordsFiltered: 0,
  recordsPersisted: 0,
  processingTimeMs: 0,
  retryCount: 0,
  processedBy: this.workerId,
});

return true;

}

async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> { const client = this.getClient();

if (client &#x26;&#x26; !this.useInMemory) {
  await client.rpc('complete_file', {
    p_file_url: fileUrl,
    p_records_total: stats.totalRows,
    p_records_filtered: stats.filteredRows,
    p_records_persisted: stats.persistedRows,
    p_processing_time_ms: stats.durationMs,
  });
  return;
}

const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
  checkpoint.status = 'completed';
  checkpoint.recordsTotal = stats.totalRows;
  checkpoint.recordsFiltered = stats.filteredRows;
  checkpoint.recordsPersisted = stats.persistedRows;
  checkpoint.processingTimeMs = stats.durationMs;
}

}

async failFile(fileUrl: string, errorMessage: string): Promise<void> { const client = this.getClient();

if (client &#x26;&#x26; !this.useInMemory) {
  await client
    .from('file_checkpoints')
    .update({ status: 'failed', error_message: errorMessage })
    .eq('file_url', fileUrl);
  return;
}

const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
  checkpoint.status = 'failed';
  checkpoint.errorMessage = errorMessage;
  checkpoint.retryCount++;
}

}

async isProcessed(fileUrl: string): Promise<boolean> { const client = this.getClient();

if (client &#x26;&#x26; !this.useInMemory) {
  const { data } = await client
    .from('file_checkpoints')
    .select('status')
    .eq('file_url', fileUrl)
    .single();
  
  return data?.status === 'completed';
}

return this.inMemory.get(fileUrl)?.status === 'completed';

}

getWorkerId(): string { return this.workerId; } }

Usage Examples

Processing Files

const checkpoint = new CheckpointManager(getDbClient);

async function processFiles(fileUrls: string[]) { for (const url of fileUrls) { // Try to claim const claimed = await checkpoint.claimFile(url, 'events', new Date()); if (!claimed) { console.log(Skipping ${url} - already claimed); continue; }

const startTime = Date.now();

try {
  const result = await processFile(url);
  
  await checkpoint.completeFile(url, {
    totalRows: result.total,
    filteredRows: result.filtered,
    persistedRows: result.persisted,
    durationMs: Date.now() - startTime,
  });
  
} catch (error) {
  await checkpoint.failFile(url, error.message);
}

} }

Best Practices

  • Use database functions for atomic claims - prevents race conditions

  • Always have in-memory fallback for dev/testing

  • Track retry count to prevent infinite loops (max 3 retries)

  • Include processing stats for observability

  • Generate unique worker IDs to track which worker processed what

Common Mistakes

  • Not using atomic operations for claiming (race conditions)

  • No retry limit (infinite retry loops)

  • Forgetting in-memory fallback (breaks local development)

  • Not tracking processing statistics (can't debug issues)

  • Using file path instead of URL as key (path changes between environments)

Related Patterns

  • batch-processing - Batch database operations

  • dead-letter-queue - Handle permanently failed files

  • distributed-lock - Coordinate between workers

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

oauth-social-login

No summary provided by upstream source.

Repository SourceNeeds Review
General

sse-streaming

No summary provided by upstream source.

Repository SourceNeeds Review
General

multi-tenancy

No summary provided by upstream source.

Repository SourceNeeds Review
General

deduplication

No summary provided by upstream source.

Repository SourceNeeds Review