ETL Incremental Patterns
Patterns for incremental data loading and backfill operations.
Backfill Strategy
from datetime import date, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed
def backfill_date_range( start: date, end: date, process_fn: callable, parallel: int = 4 ) -> None: """Backfill data for a date range.""" dates = [] current = start while current <= end: dates.append(current) current += timedelta(days=1)
# Process in parallel with controlled concurrency
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {executor.submit(process_fn, d): d for d in dates}
for future in as_completed(futures):
d = futures[future]
try:
future.result()
print(f"Completed: {d}")
except Exception as e:
print(f"Failed: {d} - {e}")
Usage
backfill_date_range( start=date(2024, 1, 1), end=date(2024, 3, 31), process_fn=process_daily_data, parallel=4 )
Incremental Load Patterns
Timestamp-Based Incremental
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame: last_run = get_last_run_timestamp(table) query = f""" SELECT * FROM {table} WHERE {timestamp_col} > :last_run ORDER BY {timestamp_col} """ df = pd.read_sql(query, engine, params={'last_run': last_run}) if not df.empty: set_last_run_timestamp(table, df[timestamp_col].max()) return df
Change Data Capture (CDC)
def process_cdc_events(events: list[dict]) -> None: for event in events: op = event['operation'] # INSERT, UPDATE, DELETE data = event['data']
if op == 'DELETE':
soft_delete(data['id'])
else:
upsert(data)
Full Refresh with Swap
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None: temp_table = f"{table}_temp" df.to_sql(temp_table, engine, if_exists='replace', index=False)
with engine.begin() as conn:
conn.execute(text(f"DROP TABLE IF EXISTS {table}_old"))
conn.execute(text(f"ALTER TABLE {table} RENAME TO {table}_old"))
conn.execute(text(f"ALTER TABLE {temp_table} RENAME TO {table}"))
conn.execute(text(f"DROP TABLE {table}_old"))
Pipeline Orchestration
from enum import Enum from dataclasses import dataclass, field
class StepStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped"
@dataclass class PipelineStep: name: str func: callable dependencies: list[str] = field(default_factory=list) status: StepStatus = StepStatus.PENDING error: str | None = None
class Pipeline: def init(self, name: str): self.name = name self.steps: dict[str, PipelineStep] = {}
def add_step(self, name: str, func: callable, depends_on: list[str] = None):
self.steps[name] = PipelineStep(name, func, depends_on or [])
def run(self) -> bool:
for step in self._topological_sort():
# Skip if dependencies failed
if any(self.steps[d].status == StepStatus.FAILED for d in step.dependencies):
step.status = StepStatus.SKIPPED
continue
step.status = StepStatus.RUNNING
try:
step.func()
step.status = StepStatus.SUCCESS
except Exception as e:
step.status = StepStatus.FAILED
step.error = str(e)
return all(s.status == StepStatus.SUCCESS for s in self.steps.values())
def _topological_sort(self) -> list[PipelineStep]:
# Implementation of topological sort for dependency ordering
...
Load Strategy Decision Matrix
Scenario Pattern When to Use
Small tables (<100K rows) Full refresh Daily/hourly loads
Large tables with timestamps Timestamp incremental Continuous sync
Source supports CDC CDC events Real-time updates
One-time historical load Parallel backfill Initial migration
Critical tables Swap pattern Zero-downtime refresh