clickhouse-io

ClickHouse Analytics Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "clickhouse-io" with this command: npx skills add oldwinter/skills/oldwinter-skills-clickhouse-io

ClickHouse Analytics Patterns

ClickHouse-specific patterns for high-performance analytics and data engineering.

Overview

ClickHouse is a column-oriented database management system (DBMS) for online analytical processing (OLAP). It's optimized for fast analytical queries on large datasets.

Key Features:

  • Column-oriented storage

  • Data compression

  • Parallel query execution

  • Distributed queries

  • Real-time analytics

Table Design Patterns

MergeTree Engine (Most Common)

CREATE TABLE markets_analytics ( date Date, market_id String, market_name String, volume UInt64, trades UInt32, unique_traders UInt32, avg_trade_size Float64, created_at DateTime ) ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (date, market_id) SETTINGS index_granularity = 8192;

ReplacingMergeTree (Deduplication)

-- For data that may have duplicates (e.g., from multiple sources) CREATE TABLE user_events ( event_id String, user_id String, event_type String, timestamp DateTime, properties String ) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(timestamp) ORDER BY (user_id, event_id, timestamp) PRIMARY KEY (user_id, event_id);

AggregatingMergeTree (Pre-aggregation)

-- For maintaining aggregated metrics CREATE TABLE market_stats_hourly ( hour DateTime, market_id String, total_volume AggregateFunction(sum, UInt64), total_trades AggregateFunction(count, UInt32), unique_users AggregateFunction(uniq, String) ) ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(hour) ORDER BY (hour, market_id);

-- Query aggregated data SELECT hour, market_id, sumMerge(total_volume) AS volume, countMerge(total_trades) AS trades, uniqMerge(unique_users) AS users FROM market_stats_hourly WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR) GROUP BY hour, market_id ORDER BY hour DESC;

Query Optimization Patterns

Efficient Filtering

-- ✅ GOOD: Use indexed columns first SELECT * FROM markets_analytics WHERE date >= '2025-01-01' AND market_id = 'market-123' AND volume > 1000 ORDER BY date DESC LIMIT 100;

-- ❌ BAD: Filter on non-indexed columns first SELECT * FROM markets_analytics WHERE volume > 1000 AND market_name LIKE '%election%' AND date >= '2025-01-01';

Aggregations

-- ✅ GOOD: Use ClickHouse-specific aggregation functions SELECT toStartOfDay(created_at) AS day, market_id, sum(volume) AS total_volume, count() AS total_trades, uniq(trader_id) AS unique_traders, avg(trade_size) AS avg_size FROM trades WHERE created_at >= today() - INTERVAL 7 DAY GROUP BY day, market_id ORDER BY day DESC, total_volume DESC;

-- ✅ Use quantile for percentiles (more efficient than percentile) SELECT quantile(0.50)(trade_size) AS median, quantile(0.95)(trade_size) AS p95, quantile(0.99)(trade_size) AS p99 FROM trades WHERE created_at >= now() - INTERVAL 1 HOUR;

Window Functions

-- Calculate running totals SELECT date, market_id, volume, sum(volume) OVER ( PARTITION BY market_id ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS cumulative_volume FROM markets_analytics WHERE date >= today() - INTERVAL 30 DAY ORDER BY market_id, date;

Data Insertion Patterns

Bulk Insert (Recommended)

import { ClickHouse } from 'clickhouse'

const clickhouse = new ClickHouse({ url: process.env.CLICKHOUSE_URL, port: 8123, basicAuth: { username: process.env.CLICKHOUSE_USER, password: process.env.CLICKHOUSE_PASSWORD } })

// ✅ Batch insert (efficient) async function bulkInsertTrades(trades: Trade[]) { const values = trades.map(trade => ( '${trade.id}', '${trade.market_id}', '${trade.user_id}', ${trade.amount}, '${trade.timestamp.toISOString()}' )).join(',')

await clickhouse.query( INSERT INTO trades (id, market_id, user_id, amount, timestamp) VALUES ${values} ).toPromise() }

// ❌ Individual inserts (slow) async function insertTrade(trade: Trade) { // Don't do this in a loop! await clickhouse.query( INSERT INTO trades VALUES ('${trade.id}', ...) ).toPromise() }

Streaming Insert

// For continuous data ingestion import { createWriteStream } from 'fs' import { pipeline } from 'stream/promises'

async function streamInserts() { const stream = clickhouse.insert('trades').stream()

for await (const batch of dataSource) { stream.write(batch) }

await stream.end() }

Materialized Views

Real-time Aggregations

-- Create materialized view for hourly stats CREATE MATERIALIZED VIEW market_stats_hourly_mv TO market_stats_hourly AS SELECT toStartOfHour(timestamp) AS hour, market_id, sumState(amount) AS total_volume, countState() AS total_trades, uniqState(user_id) AS unique_users FROM trades GROUP BY hour, market_id;

-- Query the materialized view SELECT hour, market_id, sumMerge(total_volume) AS volume, countMerge(total_trades) AS trades, uniqMerge(unique_users) AS users FROM market_stats_hourly WHERE hour >= now() - INTERVAL 24 HOUR GROUP BY hour, market_id;

Performance Monitoring

Query Performance

-- Check slow queries SELECT query_id, user, query, query_duration_ms, read_rows, read_bytes, memory_usage FROM system.query_log WHERE type = 'QueryFinish' AND query_duration_ms > 1000 AND event_time >= now() - INTERVAL 1 HOUR ORDER BY query_duration_ms DESC LIMIT 10;

Table Statistics

-- Check table sizes SELECT database, table, formatReadableSize(sum(bytes)) AS size, sum(rows) AS rows, max(modification_time) AS latest_modification FROM system.parts WHERE active GROUP BY database, table ORDER BY sum(bytes) DESC;

Common Analytics Queries

Time Series Analysis

-- Daily active users SELECT toDate(timestamp) AS date, uniq(user_id) AS daily_active_users FROM events WHERE timestamp >= today() - INTERVAL 30 DAY GROUP BY date ORDER BY date;

-- Retention analysis SELECT signup_date, countIf(days_since_signup = 0) AS day_0, countIf(days_since_signup = 1) AS day_1, countIf(days_since_signup = 7) AS day_7, countIf(days_since_signup = 30) AS day_30 FROM ( SELECT user_id, min(toDate(timestamp)) AS signup_date, toDate(timestamp) AS activity_date, dateDiff('day', signup_date, activity_date) AS days_since_signup FROM events GROUP BY user_id, activity_date ) GROUP BY signup_date ORDER BY signup_date DESC;

Funnel Analysis

-- Conversion funnel SELECT countIf(step = 'viewed_market') AS viewed, countIf(step = 'clicked_trade') AS clicked, countIf(step = 'completed_trade') AS completed, round(clicked / viewed * 100, 2) AS view_to_click_rate, round(completed / clicked * 100, 2) AS click_to_completion_rate FROM ( SELECT user_id, session_id, event_type AS step FROM events WHERE event_date = today() ) GROUP BY session_id;

Cohort Analysis

-- User cohorts by signup month SELECT toStartOfMonth(signup_date) AS cohort, toStartOfMonth(activity_date) AS month, dateDiff('month', cohort, month) AS months_since_signup, count(DISTINCT user_id) AS active_users FROM ( SELECT user_id, min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date, toDate(timestamp) AS activity_date FROM events ) GROUP BY cohort, month, months_since_signup ORDER BY cohort, months_since_signup;

Data Pipeline Patterns

ETL Pattern

// Extract, Transform, Load async function etlPipeline() { // 1. Extract from source const rawData = await extractFromPostgres()

// 2. Transform const transformed = rawData.map(row => ({ date: new Date(row.created_at).toISOString().split('T')[0], market_id: row.market_slug, volume: parseFloat(row.total_volume), trades: parseInt(row.trade_count) }))

// 3. Load to ClickHouse await bulkInsertToClickHouse(transformed) }

// Run periodically setInterval(etlPipeline, 60 * 60 * 1000) // Every hour

Change Data Capture (CDC)

// Listen to PostgreSQL changes and sync to ClickHouse import { Client } from 'pg'

const pgClient = new Client({ connectionString: process.env.DATABASE_URL })

pgClient.query('LISTEN market_updates')

pgClient.on('notification', async (msg) => { const update = JSON.parse(msg.payload)

await clickhouse.insert('market_updates', [ { market_id: update.id, event_type: update.operation, // INSERT, UPDATE, DELETE timestamp: new Date(), data: JSON.stringify(update.new_data) } ]) })

Best Practices

  1. Partitioning Strategy
  • Partition by time (usually month or day)

  • Avoid too many partitions (performance impact)

  • Use DATE type for partition key

  1. Ordering Key
  • Put most frequently filtered columns first

  • Consider cardinality (high cardinality first)

  • Order impacts compression

  1. Data Types
  • Use smallest appropriate type (UInt32 vs UInt64)

  • Use LowCardinality for repeated strings

  • Use Enum for categorical data

  1. Avoid
  • SELECT * (specify columns)

  • FINAL (merge data before query instead)

  • Too many JOINs (denormalize for analytics)

  • Small frequent inserts (batch instead)

  1. Monitoring
  • Track query performance

  • Monitor disk usage

  • Check merge operations

  • Review slow query log

Remember: ClickHouse excels at analytical workloads. Design tables for your query patterns, batch inserts, and leverage materialized views for real-time aggregations.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

github-cli

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

aws-cli

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

argocd-cli

No summary provided by upstream source.

Repository SourceNeeds Review