Database Sharding Overview
Implement horizontal data partitioning across multiple database servers. Covers sharding strategies, consistent hashing, shard key selection, and cross-shard querying patterns.
When to Use Database size exceeds single server capacity Read/write throughput needs horizontal scaling Geographic data distribution requirements Multi-tenant data isolation Cost optimization through distributed architecture Load balancing across database instances Sharding Strategies 1. Range-Based Sharding
PostgreSQL - Range Sharding Implementation:
-- Define shard ranges -- Shard 0: user_id 0-999999 -- Shard 1: user_id 1000000-1999999 -- Shard 2: user_id 2000000-2999999
CREATE TABLE users_shard_0 ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id BIGINT NOT NULL, email VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT NOW(), CONSTRAINT shard_0_range CHECK (user_id BETWEEN 0 AND 999999) );
CREATE TABLE users_shard_1 ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id BIGINT NOT NULL, email VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT NOW(), CONSTRAINT shard_1_range CHECK (user_id BETWEEN 1000000 AND 1999999) );
-- Function to determine shard CREATE OR REPLACE FUNCTION get_shard_id(p_user_id BIGINT) RETURNS INT AS $$ BEGIN RETURN (p_user_id / 1000000)::INT; END; $$ LANGUAGE plpgsql IMMUTABLE;
- Hash-Based Sharding
PostgreSQL - Consistent Hash Sharding:
-- Hash-based distribution across 4 shards CREATE OR REPLACE FUNCTION get_hash_shard( p_key VARCHAR, p_shard_count INT DEFAULT 4 ) RETURNS INT AS $$ DECLARE hash_val BIGINT; BEGIN -- Use PostgreSQL's hashtext function hash_val := abs(hashtext(p_key)::BIGINT); RETURN (hash_val % p_shard_count)::INT; END; $$ LANGUAGE plpgsql IMMUTABLE;
-- Create sharded tables CREATE TABLE users_shard_0 ( id UUID PRIMARY KEY, user_key VARCHAR(255) NOT NULL, email VARCHAR(255), created_at TIMESTAMP DEFAULT NOW() );
CREATE TABLE users_shard_1 AS TABLE users_shard_0; CREATE TABLE users_shard_2 AS TABLE users_shard_0; CREATE TABLE users_shard_3 AS TABLE users_shard_0;
-- Insert with shard routing INSERT INTO users_shard_0 SELECT * FROM users WHERE get_hash_shard(user_key, 4) = 0;
INSERT INTO users_shard_1 SELECT * FROM users WHERE get_hash_shard(user_key, 4) = 1;
Consistent Hashing for Resilience:
-- Virtual nodes for better load distribution CREATE TABLE shard_mapping ( virtual_node_id INT PRIMARY KEY, actual_shard_id INT NOT NULL, shard_host VARCHAR(255), shard_port INT );
INSERT INTO shard_mapping VALUES (0, 0, 'shard0.example.com', 5432), (1, 1, 'shard1.example.com', 5432), (2, 2, 'shard2.example.com', 5432), (3, 3, 'shard3.example.com', 5432), (4, 1, 'shard1.example.com', 5432), -- Virtual node (5, 2, 'shard2.example.com', 5432);
-- Find shard for key CREATE OR REPLACE FUNCTION find_shard_host(p_key VARCHAR) RETURNS TABLE (shard_id INT, host VARCHAR, port INT) AS $$ BEGIN RETURN QUERY SELECT sm.actual_shard_id, sm.shard_host, sm.shard_port FROM shard_mapping sm WHERE sm.virtual_node_id = ( abs(hashtext(p_key)::BIGINT) % (SELECT COUNT(*) FROM shard_mapping) )::INT LIMIT 1; END; $$ LANGUAGE plpgsql;
- Directory-Based Sharding
PostgreSQL - Lookup Table Approach:
-- Create shard directory CREATE TABLE shard_directory ( shard_key VARCHAR(255) PRIMARY KEY, shard_id INT NOT NULL, shard_host VARCHAR(255) NOT NULL, shard_port INT DEFAULT 5432, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() );
CREATE INDEX idx_shard_id ON shard_directory(shard_id);
-- Insert shard configuration INSERT INTO shard_directory (shard_key, shard_id, shard_host) VALUES ('user_1', 0, 'shard0.example.com'), ('user_2', 1, 'shard1.example.com'), ('tenant_a', 2, 'shard2.example.com'), ('tenant_b', 3, 'shard3.example.com');
-- Function to get shard from directory CREATE OR REPLACE FUNCTION get_shard_info(p_key VARCHAR) RETURNS TABLE (shard_id INT, host VARCHAR, port INT) AS $$ BEGIN RETURN QUERY SELECT sd.shard_id, sd.shard_host, sd.shard_port FROM shard_directory sd WHERE sd.shard_key = p_key; END; $$ LANGUAGE plpgsql;
Shard Key Selection
Good Shard Key Characteristics:
-- Example: User-based sharding -- Shard Key: user_id -- Good because: frequently used in queries, stable value
-- All queries include shard key SELECT * FROM users WHERE user_id = 123; SELECT * FROM orders WHERE user_id = 123 AND order_id = 456;
-- Composite shard key for multi-tenant systems -- Shard Key: (tenant_id, user_id) SELECT * FROM users WHERE tenant_id = 'tenant_a' AND user_id = 123;
-- Index on shard key for performance CREATE INDEX idx_users_shard_key ON users_shard_0(user_id); CREATE INDEX idx_orders_shard_key ON orders_shard_0(user_id, order_id);
Cross-Shard Operations
PostgreSQL - Distributed Query Pattern:
-- Create foreign server connections for each shard CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_0 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard0.example.com', dbname 'mydb');
CREATE SERVER shard_1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard1.example.com', dbname 'mydb');
-- Create foreign tables CREATE FOREIGN TABLE users_remote_0 ( id UUID, user_id BIGINT, email VARCHAR, created_at TIMESTAMP ) SERVER shard_0 OPTIONS (table_name 'users');
CREATE FOREIGN TABLE users_remote_1 ( id UUID, user_id BIGINT, email VARCHAR, created_at TIMESTAMP ) SERVER shard_1 OPTIONS (table_name 'users');
-- Distributed query across shards SELECT * FROM users_remote_0 UNION ALL SELECT * FROM users_remote_1 WHERE created_at > NOW() - INTERVAL '7 days' ORDER BY created_at DESC LIMIT 100;
Cross-Shard Aggregation:
-- Aggregate data from all shards CREATE OR REPLACE FUNCTION aggregate_user_orders() RETURNS TABLE (user_id BIGINT, total_orders BIGINT, total_spent DECIMAL) AS $$ BEGIN RETURN QUERY SELECT so.user_id, COUNT(so.id)::BIGINT, SUM(so.total)::DECIMAL FROM ( SELECT user_id, id, total FROM orders_shard_0 UNION ALL SELECT user_id, id, total FROM orders_shard_1 UNION ALL SELECT user_id, id, total FROM orders_shard_2 UNION ALL SELECT user_id, id, total FROM orders_shard_3 ) so GROUP BY so.user_id; END; $$ LANGUAGE plpgsql;
Shard Rebalancing
PostgreSQL - Add New Shard:
-- 1. Create new shard tables CREATE TABLE users_shard_4 ( id UUID PRIMARY KEY, user_id BIGINT NOT NULL, email VARCHAR(255), created_at TIMESTAMP );
-- 2. Migrate data using hash function INSERT INTO users_shard_4 SELECT * FROM ( SELECT * FROM users_shard_0 UNION ALL SELECT * FROM users_shard_1 UNION ALL SELECT * FROM users_shard_2 UNION ALL SELECT * FROM users_shard_3 ) all_users WHERE get_hash_shard(user_id::VARCHAR, 5) = 4;
-- 3. Remove migrated data from old shards DELETE FROM users_shard_0 WHERE get_hash_shard(user_id::VARCHAR, 5) = 4; DELETE FROM users_shard_1 WHERE get_hash_shard(user_id::VARCHAR, 5) = 4; DELETE FROM users_shard_2 WHERE get_hash_shard(user_id::VARCHAR, 5) = 4; DELETE FROM users_shard_3 WHERE get_hash_shard(user_id::VARCHAR, 5) = 4;
-- 4. Update shard count in configuration -- Update application configuration: shard_count = 5
Shard Monitoring
PostgreSQL - Monitor Shard Balance:
-- Check shard distribution CREATE OR REPLACE FUNCTION monitor_shard_distribution() RETURNS TABLE (shard_id INT, record_count BIGINT, avg_records BIGINT) AS $$ DECLARE total_records BIGINT; BEGIN SELECT COUNT() INTO total_records FROM ( SELECT 0 as shard_id, COUNT() FROM users_shard_0 UNION ALL SELECT 1, COUNT() FROM users_shard_1 UNION ALL SELECT 2, COUNT() FROM users_shard_2 UNION ALL SELECT 3, COUNT(*) FROM users_shard_3 ) counts;
RETURN QUERY SELECT * FROM ( SELECT 0::INT, COUNT()::BIGINT, (total_records / 4)::BIGINT FROM users_shard_0 UNION ALL SELECT 1, COUNT(), (total_records / 4) FROM users_shard_1 UNION ALL SELECT 2, COUNT(), (total_records / 4) FROM users_shard_2 UNION ALL SELECT 3, COUNT(), (total_records / 4) FROM users_shard_3 ); END; $$ LANGUAGE plpgsql;
SELECT * FROM monitor_shard_distribution();
Monitor Shard Access:
-- Track which shard is accessed CREATE TABLE shard_access_log ( shard_id INT, operation VARCHAR(10), record_count INT, duration_ms INT, accessed_at TIMESTAMP DEFAULT NOW() );
-- Log shard access patterns SELECT shard_id, operation, COUNT(*) as access_count FROM shard_access_log WHERE accessed_at > NOW() - INTERVAL '1 hour' GROUP BY shard_id, operation ORDER BY shard_id;
Common Sharding Mistakes
❌ Don't use non-stable values as shard keys ❌ Don't forget to include shard key in all queries ❌ Don't overlook cross-shard query complexity ❌ Don't ignore uneven shard distribution ❌ Don't miss distributed transaction challenges
✅ DO validate shard key at insertion ✅ DO monitor shard balance regularly ✅ DO plan for shard rebalancing ✅ DO test cross-shard operations thoroughly ✅ DO document shard mapping clearly
Sharding Strategies Comparison Strategy Pros Cons Range-based Simple to implement Hotspots in ranges Hash-based Even distribution Complex rebalancing Directory-based Flexible, dynamic Extra lookup overhead Resources PostgreSQL Partitioning Citus - PostgreSQL Sharding Vitess - MySQL Middleware ShardingSphere - Sharding Framework