clickhouse-io

安装量: 1.2K
排名: #1132

安装

npx skills add https://github.com/affaan-m/everything-claude-code --skill clickhouse-io
ClickHouse Analytics Patterns
ClickHouse-specific patterns for high-performance analytics and data engineering.
When to Activate
Designing ClickHouse table schemas (MergeTree engine selection)
Writing analytical queries (aggregations, window functions, joins)
Optimizing query performance (partition pruning, projections, materialized views)
Ingesting large volumes of data (batch inserts, Kafka integration)
Migrating from PostgreSQL/MySQL to ClickHouse for analytics
Implementing real-time dashboards or time-series analytics
Overview
ClickHouse is a column-oriented database management system (DBMS) for online analytical processing (OLAP). It's optimized for fast analytical queries on large datasets.
Key Features:
Column-oriented storage
Data compression
Parallel query execution
Distributed queries
Real-time analytics
Table Design Patterns
MergeTree Engine (Most Common)
CREATE
TABLE
markets_analytics
(
date
Date
,
market_id String
,
market_name String
,
volume UInt64
,
trades UInt32
,
unique_traders UInt32
,
avg_trade_size Float64
,
created_at
DateTime
)
ENGINE
=
MergeTree
(
)
PARTITION
BY
toYYYYMM
(
date
)
ORDER
BY
(
date
,
market_id
)
SETTINGS index_granularity
=
8192
;
ReplacingMergeTree (Deduplication)
-- For data that may have duplicates (e.g., from multiple sources)
CREATE
TABLE
user_events
(
event_id String
,
user_id String
,
event_type String
,
timestamp
DateTime
,
properties String
)
ENGINE
=
ReplacingMergeTree
(
)
PARTITION
BY
toYYYYMM
(
timestamp
)
ORDER
BY
(
user_id
,
event_id
,
timestamp
)
PRIMARY
KEY
(
user_id
,
event_id
)
;
AggregatingMergeTree (Pre-aggregation)
-- For maintaining aggregated metrics
CREATE
TABLE
market_stats_hourly
(
hour
DateTime
,
market_id String
,
total_volume AggregateFunction
(
sum
,
UInt64
)
,
total_trades AggregateFunction
(
count
,
UInt32
)
,
unique_users AggregateFunction
(
uniq
,
String
)
)
ENGINE
=
AggregatingMergeTree
(
)
PARTITION
BY
toYYYYMM
(
hour
)
ORDER
BY
(
hour
,
market_id
)
;
-- Query aggregated data
SELECT
hour
,
market_id
,
sumMerge
(
total_volume
)
AS
volume
,
countMerge
(
total_trades
)
AS
trades
,
uniqMerge
(
unique_users
)
AS
users
FROM
market_stats_hourly
WHERE
hour
>=
toStartOfHour
(
now
(
)
-
INTERVAL
24
HOUR
)
GROUP
BY
hour
,
market_id
ORDER
BY
hour
DESC
;
Query Optimization Patterns
Efficient Filtering
-- ✅ GOOD: Use indexed columns first
SELECT
*
FROM
markets_analytics
WHERE
date
>=
'2025-01-01'
AND
market_id
=
'market-123'
AND
volume
>
1000
ORDER
BY
date
DESC
LIMIT
100
;
-- ❌ BAD: Filter on non-indexed columns first
SELECT
*
FROM
markets_analytics
WHERE
volume
>
1000
AND
market_name
LIKE
'%election%'
AND
date
>=
'2025-01-01'
;
Aggregations
-- ✅ GOOD: Use ClickHouse-specific aggregation functions
SELECT
toStartOfDay
(
created_at
)
AS
day
,
market_id
,
sum
(
volume
)
AS
total_volume
,
count
(
)
AS
total_trades
,
uniq
(
trader_id
)
AS
unique_traders
,
avg
(
trade_size
)
AS
avg_size
FROM
trades
WHERE
created_at
>=
today
(
)
-
INTERVAL
7
DAY
GROUP
BY
day
,
market_id
ORDER
BY
day
DESC
,
total_volume
DESC
;
-- ✅ Use quantile for percentiles (more efficient than percentile)
SELECT
quantile
(
0.50
)
(
trade_size
)
AS
median
,
quantile
(
0.95
)
(
trade_size
)
AS
p95
,
quantile
(
0.99
)
(
trade_size
)
AS
p99
FROM
trades
WHERE
created_at
>=
now
(
)
-
INTERVAL
1
HOUR
;
Window Functions
-- Calculate running totals
SELECT
date
,
market_id
,
volume
,
sum
(
volume
)
OVER
(
PARTITION
BY
market_id
ORDER
BY
date
ROWS
BETWEEN
UNBOUNDED
PRECEDING
AND
CURRENT
ROW
)
AS
cumulative_volume
FROM
markets_analytics
WHERE
date
>=
today
(
)
-
INTERVAL
30
DAY
ORDER
BY
market_id
,
date
;
Data Insertion Patterns
Bulk Insert (Recommended)
import
{
ClickHouse
}
from
'clickhouse'
const
clickhouse
=
new
ClickHouse
(
{
url
:
process
.
env
.
CLICKHOUSE_URL
,
port
:
8123
,
basicAuth
:
{
username
:
process
.
env
.
CLICKHOUSE_USER
,
password
:
process
.
env
.
CLICKHOUSE_PASSWORD
}
}
)
// ✅ Batch insert (efficient)
async
function
bulkInsertTrades
(
trades
:
Trade
[
]
)
{
const
values
=
trades
.
map
(
trade
=>
`
(
'
${
trade
.
id
}
',
'
${
trade
.
market_id
}
',
'
${
trade
.
user_id
}
',
${
trade
.
amount
}
,
'
${
trade
.
timestamp
.
toISOString
(
)
}
'
)
`
)
.
join
(
','
)
await
clickhouse
.
query
(
`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES
${
values
}
`
)
.
toPromise
(
)
}
// ❌ Individual inserts (slow)
async
function
insertTrade
(
trade
:
Trade
)
{
// Don't do this in a loop!
await
clickhouse
.
query
(
`
INSERT INTO trades VALUES ('
${
trade
.
id
}
', ...)
`
)
.
toPromise
(
)
}
Streaming Insert
// For continuous data ingestion
import
{
createWriteStream
}
from
'fs'
import
{
pipeline
}
from
'stream/promises'
async
function
streamInserts
(
)
{
const
stream
=
clickhouse
.
insert
(
'trades'
)
.
stream
(
)
for
await
(
const
batch
of
dataSource
)
{
stream
.
write
(
batch
)
}
await
stream
.
end
(
)
}
Materialized Views
Real-time Aggregations
-- Create materialized view for hourly stats
CREATE
MATERIALIZED
VIEW
market_stats_hourly_mv
TO
market_stats_hourly
AS
SELECT
toStartOfHour
(
timestamp
)
AS
hour
,
market_id
,
sumState
(
amount
)
AS
total_volume
,
countState
(
)
AS
total_trades
,
uniqState
(
user_id
)
AS
unique_users
FROM
trades
GROUP
BY
hour
,
market_id
;
-- Query the materialized view
SELECT
hour
,
market_id
,
sumMerge
(
total_volume
)
AS
volume
,
countMerge
(
total_trades
)
AS
trades
,
uniqMerge
(
unique_users
)
AS
users
FROM
market_stats_hourly
WHERE
hour
>=
now
(
)
-
INTERVAL
24
HOUR
GROUP
BY
hour
,
market_id
;
Performance Monitoring
Query Performance
-- Check slow queries
SELECT
query_id
,
user
,
query
,
query_duration_ms
,
read_rows
,
read_bytes
,
memory_usage
FROM
system
.
query_log
WHERE
type
=
'QueryFinish'
AND
query_duration_ms
>
1000
AND
event_time
>=
now
(
)
-
INTERVAL
1
HOUR
ORDER
BY
query_duration_ms
DESC
LIMIT
10
;
Table Statistics
-- Check table sizes
SELECT
database
,
table
,
formatReadableSize
(
sum
(
bytes
)
)
AS
size
,
sum
(
rows
)
AS
rows
,
max
(
modification_time
)
AS
latest_modification
FROM
system
.
parts
WHERE
active
GROUP
BY
database
,
table
ORDER
BY
sum
(
bytes
)
DESC
;
Common Analytics Queries
Time Series Analysis
-- Daily active users
SELECT
toDate
(
timestamp
)
AS
date
,
uniq
(
user_id
)
AS
daily_active_users
FROM
events
WHERE
timestamp
>=
today
(
)
-
INTERVAL
30
DAY
GROUP
BY
date
ORDER
BY
date
;
-- Retention analysis
SELECT
signup_date
,
countIf
(
days_since_signup
=
0
)
AS
day_0
,
countIf
(
days_since_signup
=
1
)
AS
day_1
,
countIf
(
days_since_signup
=
7
)
AS
day_7
,
countIf
(
days_since_signup
=
30
)
AS
day_30
FROM
(
SELECT
user_id
,
min
(
toDate
(
timestamp
)
)
AS
signup_date
,
toDate
(
timestamp
)
AS
activity_date
,
dateDiff
(
'day'
,
signup_date
,
activity_date
)
AS
days_since_signup
FROM
events
GROUP
BY
user_id
,
activity_date
)
GROUP
BY
signup_date
ORDER
BY
signup_date
DESC
;
Funnel Analysis
-- Conversion funnel
SELECT
countIf
(
step
=
'viewed_market'
)
AS
viewed
,
countIf
(
step
=
'clicked_trade'
)
AS
clicked
,
countIf
(
step
=
'completed_trade'
)
AS
completed
,
round
(
clicked
/
viewed
*
100
,
2
)
AS
view_to_click_rate
,
round
(
completed
/
clicked
*
100
,
2
)
AS
click_to_completion_rate
FROM
(
SELECT
user_id
,
session_id
,
event_type
AS
step
FROM
events
WHERE
event_date
=
today
(
)
)
GROUP
BY
session_id
;
Cohort Analysis
-- User cohorts by signup month
SELECT
toStartOfMonth
(
signup_date
)
AS
cohort
,
toStartOfMonth
(
activity_date
)
AS
month
,
dateDiff
(
'month'
,
cohort
,
month
)
AS
months_since_signup
,
count
(
DISTINCT
user_id
)
AS
active_users
FROM
(
SELECT
user_id
,
min
(
toDate
(
timestamp
)
)
OVER
(
PARTITION
BY
user_id
)
AS
signup_date
,
toDate
(
timestamp
)
AS
activity_date
FROM
events
)
GROUP
BY
cohort
,
month
,
months_since_signup
ORDER
BY
cohort
,
months_since_signup
;
Data Pipeline Patterns
ETL Pattern
// Extract, Transform, Load
async
function
etlPipeline
(
)
{
// 1. Extract from source
const
rawData
=
await
extractFromPostgres
(
)
// 2. Transform
const
transformed
=
rawData
.
map
(
row
=>
(
{
date
:
new
Date
(
row
.
created_at
)
.
toISOString
(
)
.
split
(
'T'
)
[
0
]
,
market_id
:
row
.
market_slug
,
volume
:
parseFloat
(
row
.
total_volume
)
,
trades
:
parseInt
(
row
.
trade_count
)
}
)
)
// 3. Load to ClickHouse
await
bulkInsertToClickHouse
(
transformed
)
}
// Run periodically
setInterval
(
etlPipeline
,
60
*
60
*
1000
)
// Every hour
Change Data Capture (CDC)
// Listen to PostgreSQL changes and sync to ClickHouse
import
{
Client
}
from
'pg'
const
pgClient
=
new
Client
(
{
connectionString
:
process
.
env
.
DATABASE_URL
}
)
pgClient
.
query
(
'LISTEN market_updates'
)
pgClient
.
on
(
'notification'
,
async
(
msg
)
=>
{
const
update
=
JSON
.
parse
(
msg
.
payload
)
await
clickhouse
.
insert
(
'market_updates'
,
[
{
market_id
:
update
.
id
,
event_type
:
update
.
operation
,
// INSERT, UPDATE, DELETE
timestamp
:
new
Date
(
)
,
data
:
JSON
.
stringify
(
update
.
new_data
)
}
]
)
}
)
Best Practices
1. Partitioning Strategy
Partition by time (usually month or day)
Avoid too many partitions (performance impact)
Use DATE type for partition key
2. Ordering Key
Put most frequently filtered columns first
Consider cardinality (high cardinality first)
Order impacts compression
3. Data Types
Use smallest appropriate type (UInt32 vs UInt64)
Use LowCardinality for repeated strings
Use Enum for categorical data
4. Avoid
SELECT * (specify columns)
FINAL (merge data before query instead)
Too many JOINs (denormalize for analytics)
Small frequent inserts (batch instead)
5. Monitoring
Track query performance
Monitor disk usage
Check merge operations
Review slow query log
Remember
ClickHouse excels at analytical workloads. Design tables for your query patterns, batch inserts, and leverage materialized views for real-time aggregations.
返回排行榜