Build a Social Media Data Warehouse: Complete Architecture Guide
You scrape data from Instagram, TikTok, Twitter. But where does it go?
Right now it probably goes into one messy database. Posts mixed with comments mixed with profiles mixed with analytics. Queries get slower every day. Finding anything takes forever.
I built my first data warehouse wrong. It worked for 10,000 records. At 100,000 records queries took 30 seconds. At 1 million records everything crashed.
I rebuilt it properly. Now I store 50 million records. Queries return in under 1 second. Historical analysis goes back 2 years. Everything scales smoothly.
Let me show you how to build a social media data warehouse that actually works at scale.
Why You Need a Data Warehouse
A regular database and a data warehouse are different tools for different jobs.
Regular database: Fast reads and writes for current data. Your app uses this.
Data warehouse: Historical data, complex queries, analytics. Your reports use this.
When to Build a Warehouse
You need a data warehouse when:
- You want historical analysis (trends over months/years)
- Your analytics queries slow down your app
- You need to join data from multiple platforms
- You want to run complex aggregations
- Your database is over 100 MB
I started building my warehouse at 50,000 records. Best decision I made.
Architecture Overview
A good data warehouse has these components:
Source databases: Your app databases (PostgreSQL, MongoDB, etc) ETL pipeline: Extract, Transform, Load data into warehouse Data warehouse: Optimized for analytics (PostgreSQL, ClickHouse, BigQuery) Analytics layer: Query interface and dashboards
Let me show you how to build each piece.
Component 1: Schema Design
Design your warehouse schema for analytics, not transactions.
-- Profiles table (slowly changing dimension)
CREATE TABLE profiles (
profile_id BIGSERIAL PRIMARY KEY,
platform VARCHAR(20) NOT NULL,
username VARCHAR(255) NOT NULL,
full_name VARCHAR(255),
follower_count INTEGER,
following_count INTEGER,
post_count INTEGER,
engagement_rate DECIMAL(5,2),
category VARCHAR(100),
first_seen_at TIMESTAMP NOT NULL,
last_updated_at TIMESTAMP NOT NULL,
is_current BOOLEAN DEFAULT true,
UNIQUE(platform, username)
);
CREATE INDEX idx_profiles_platform ON profiles(platform);
CREATE INDEX idx_profiles_updated ON profiles(last_updated_at);
CREATE INDEX idx_profiles_engagement ON profiles(engagement_rate);
-- Posts table (fact table)
CREATE TABLE posts (
post_id BIGSERIAL PRIMARY KEY,
profile_id BIGINT REFERENCES profiles(profile_id),
platform VARCHAR(20) NOT NULL,
platform_post_id VARCHAR(255) NOT NULL,
post_type VARCHAR(50),
caption TEXT,
media_type VARCHAR(50),
posted_at TIMESTAMP NOT NULL,
scraped_at TIMESTAMP NOT NULL,
likes_count INTEGER DEFAULT 0,
comments_count INTEGER DEFAULT 0,
shares_count INTEGER DEFAULT 0,
views_count BIGINT DEFAULT 0,
engagement_rate DECIMAL(5,2),
UNIQUE(platform, platform_post_id)
);
CREATE INDEX idx_posts_profile ON posts(profile_id);
CREATE INDEX idx_posts_posted_at ON posts(posted_at);
CREATE INDEX idx_posts_platform ON posts(platform);
CREATE INDEX idx_posts_engagement ON posts(engagement_rate);
-- Post metrics (time series data)
CREATE TABLE post_metrics (
metric_id BIGSERIAL PRIMARY KEY,
post_id BIGINT REFERENCES posts(post_id),
recorded_at TIMESTAMP NOT NULL,
likes_count INTEGER,
comments_count INTEGER,
shares_count INTEGER,
views_count BIGINT,
engagement_rate DECIMAL(5,2)
);
CREATE INDEX idx_metrics_post ON post_metrics(post_id);
CREATE INDEX idx_metrics_recorded ON post_metrics(recorded_at);
-- Comments table (fact table)
CREATE TABLE comments (
comment_id BIGSERIAL PRIMARY KEY,
post_id BIGINT REFERENCES posts(post_id),
platform VARCHAR(20) NOT NULL,
platform_comment_id VARCHAR(255) NOT NULL,
author_username VARCHAR(255),
text TEXT,
posted_at TIMESTAMP NOT NULL,
scraped_at TIMESTAMP NOT NULL,
likes_count INTEGER DEFAULT 0,
sentiment_score DECIMAL(3,2),
UNIQUE(platform, platform_comment_id)
);
CREATE INDEX idx_comments_post ON comments(post_id);
CREATE INDEX idx_comments_posted_at ON comments(posted_at);
CREATE INDEX idx_comments_sentiment ON comments(sentiment_score);
-- Hashtags table (dimension)
CREATE TABLE hashtags (
hashtag_id SERIAL PRIMARY KEY,
tag VARCHAR(255) NOT NULL UNIQUE,
first_seen_at TIMESTAMP NOT NULL
);
-- Post hashtags junction
CREATE TABLE post_hashtags (
post_id BIGINT REFERENCES posts(post_id),
hashtag_id INTEGER REFERENCES hashtags(hashtag_id),
PRIMARY KEY(post_id, hashtag_id)
);
CREATE INDEX idx_post_hashtags_tag ON post_hashtags(hashtag_id);
-- Daily aggregates (for faster reporting)
CREATE TABLE daily_profile_stats (
stat_id BIGSERIAL PRIMARY KEY,
profile_id BIGINT REFERENCES profiles(profile_id),
stat_date DATE NOT NULL,
follower_count INTEGER,
following_count INTEGER,
post_count INTEGER,
total_likes INTEGER,
total_comments INTEGER,
avg_engagement_rate DECIMAL(5,2),
UNIQUE(profile_id, stat_date)
);
CREATE INDEX idx_daily_stats_profile ON daily_profile_stats(profile_id);
CREATE INDEX idx_daily_stats_date ON daily_profile_stats(stat_date);
Why this schema works:
- Profiles are dimensions (attributes that change slowly)
- Posts and comments are facts (events that happened)
- Time series metrics track changes over time
- Daily aggregates speed up common queries
- Indexes on key columns for fast lookups
Component 2: ETL Pipeline
Extract data from source, transform it, load into warehouse.
const { Pool } = require('pg');
const sourceDB = new Pool({
host: 'source-db.example.com',
database: 'production',
user: 'readonly',
password: process.env.SOURCE_DB_PASSWORD
});
const warehouseDB = new Pool({
host: 'warehouse-db.example.com',
database: 'warehouse',
user: 'etl',
password: process.env.WAREHOUSE_DB_PASSWORD
});
class ETLPipeline {
constructor(batchSize = 1000) {
this.batchSize = batchSize;
}
async extractProfiles(lastExtractTime) {
// Extract profiles updated since last run
const query = `
SELECT
id, platform, username, full_name,
follower_count, following_count, post_count,
engagement_rate, category, created_at, updated_at
FROM profiles
WHERE updated_at > $1
ORDER BY updated_at
`;
const result = await sourceDB.query(query, [lastExtractTime]);
console.log(`Extracted ${result.rows.length} profiles`);
return result.rows;
}
transformProfile(sourceProfile) {
// Transform source format to warehouse format
return {
platform: sourceProfile.platform,
username: sourceProfile.username,
full_name: sourceProfile.full_name,
follower_count: sourceProfile.follower_count,
following_count: sourceProfile.following_count,
post_count: sourceProfile.post_count,
engagement_rate: sourceProfile.engagement_rate,
category: sourceProfile.category,
first_seen_at: sourceProfile.created_at,
last_updated_at: sourceProfile.updated_at
};
}
async loadProfiles(profiles) {
// Batch insert profiles into warehouse
for (let i = 0; i < profiles.length; i += this.batchSize) {
const batch = profiles.slice(i, i + this.batchSize);
const values = [];
const placeholders = [];
batch.forEach((profile, index) => {
const offset = index * 10;
placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10})`);
values.push(
profile.platform,
profile.username,
profile.full_name,
profile.follower_count,
profile.following_count,
profile.post_count,
profile.engagement_rate,
profile.category,
profile.first_seen_at,
profile.last_updated_at
);
});
const query = `
INSERT INTO profiles (
platform, username, full_name, follower_count,
following_count, post_count, engagement_rate,
category, first_seen_at, last_updated_at
)
VALUES ${placeholders.join(', ')}
ON CONFLICT (platform, username) DO UPDATE SET
full_name = EXCLUDED.full_name,
follower_count = EXCLUDED.follower_count,
following_count = EXCLUDED.following_count,
post_count = EXCLUDED.post_count,
engagement_rate = EXCLUDED.engagement_rate,
category = EXCLUDED.category,
last_updated_at = EXCLUDED.last_updated_at
`;
await warehouseDB.query(query, values);
console.log(`Loaded batch ${i / this.batchSize + 1} (${batch.length} profiles)`);
}
}
async runProfileETL(lastExtractTime) {
console.log('Starting profile ETL...');
const startTime = Date.now();
// Extract
const sourceProfiles = await this.extractProfiles(lastExtractTime);
// Transform
const transformedProfiles = sourceProfiles.map(p => this.transformProfile(p));
// Load
await this.loadProfiles(transformedProfiles);
const duration = (Date.now() - startTime) / 1000;
console.log(`ETL completed in ${duration}s`);
return {
recordsProcessed: sourceProfiles.length,
duration
};
}
}
// Usage
const etl = new ETLPipeline(1000);
// Run ETL for data since yesterday
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const result = await etl.runProfileETL(yesterday);
console.log('ETL result:', result);
ETL best practices:
- Run incrementally (only new/changed data)
- Process in batches (1000 records at a time)
- Use upserts to handle duplicates
- Log everything for debugging
- Handle failures gracefully
Component 3: Incremental Updates
Don't reload all data every time. Just load what changed.
class IncrementalETL {
constructor() {
this.checkpointTable = 'etl_checkpoints';
}
async getLastCheckpoint(jobName) {
const query = `
SELECT last_extract_time
FROM ${this.checkpointTable}
WHERE job_name = $1
`;
const result = await warehouseDB.query(query, [jobName]);
if (result.rows.length > 0) {
return result.rows[0].last_extract_time;
}
// First run, return date far in past
return new Date('2020-01-01');
}
async updateCheckpoint(jobName, extractTime) {
const query = `
INSERT INTO ${this.checkpointTable} (job_name, last_extract_time, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (job_name) DO UPDATE SET
last_extract_time = EXCLUDED.last_extract_time,
updated_at = NOW()
`;
await warehouseDB.query(query, [jobName, extractTime]);
}
async runIncrementalETL(jobName, etlFunction) {
console.log(`Running incremental ETL: ${jobName}`);
const lastCheckpoint = await this.getLastCheckpoint(jobName);
console.log(`Last checkpoint: ${lastCheckpoint}`);
const currentTime = new Date();
try {
const result = await etlFunction(lastCheckpoint);
await this.updateCheckpoint(jobName, currentTime);
console.log(`ETL ${jobName} completed successfully`);
return result;
} catch (error) {
console.error(`ETL ${jobName} failed:`, error);
throw error;
}
}
}
// Usage
const incrementalETL = new IncrementalETL();
// Run profile ETL incrementally
await incrementalETL.runIncrementalETL(
'profile_sync',
async (lastCheckpoint) => {
const etl = new ETLPipeline();
return await etl.runProfileETL(lastCheckpoint);
}
);
Why incremental works:
- Only processes new data (much faster)
- Checkpoints track progress
- Can resume after failures
- Scales to billions of records
Component 4: Aggregation Tables
Pre-compute common queries for instant results.
class AggregationBuilder {
async buildDailyProfileStats(date) {
console.log(`Building daily profile stats for ${date}`);
const query = `
INSERT INTO daily_profile_stats (
profile_id, stat_date, follower_count, following_count,
post_count, total_likes, total_comments, avg_engagement_rate
)
SELECT
p.profile_id,
$1::date as stat_date,
p.follower_count,
p.following_count,
COUNT(DISTINCT po.post_id) as post_count,
SUM(po.likes_count) as total_likes,
SUM(po.comments_count) as total_comments,
AVG(po.engagement_rate) as avg_engagement_rate
FROM profiles p
LEFT JOIN posts po ON p.profile_id = po.profile_id
AND po.posted_at::date = $1::date
GROUP BY p.profile_id, p.follower_count, p.following_count
ON CONFLICT (profile_id, stat_date) DO UPDATE SET
follower_count = EXCLUDED.follower_count,
following_count = EXCLUDED.following_count,
post_count = EXCLUDED.post_count,
total_likes = EXCLUDED.total_likes,
total_comments = EXCLUDED.total_comments,
avg_engagement_rate = EXCLUDED.avg_engagement_rate
`;
const result = await warehouseDB.query(query, [date]);
console.log(`Built stats for ${result.rowCount} profiles`);
return result.rowCount;
}
async buildWeeklyTrends() {
console.log('Building weekly trend aggregates...');
const query = `
CREATE TEMP TABLE weekly_trends AS
SELECT
profile_id,
DATE_TRUNC('week', stat_date) as week_start,
AVG(follower_count) as avg_followers,
SUM(post_count) as total_posts,
AVG(avg_engagement_rate) as avg_engagement,
(MAX(follower_count) - MIN(follower_count)) as follower_growth
FROM daily_profile_stats
WHERE stat_date >= NOW() - INTERVAL '8 weeks'
GROUP BY profile_id, DATE_TRUNC('week', stat_date)
`;
await warehouseDB.query(query);
console.log('Weekly trends computed');
}
async refreshAllAggregates() {
const today = new Date();
today.setHours(0, 0, 0, 0);
// Build yesterday's stats (today's not complete yet)
const yesterday = new Date(today);
yesterday.setDate(yesterday.getDate() - 1);
await this.buildDailyProfileStats(yesterday.toISOString().split('T')[0]);
await this.buildWeeklyTrends();
console.log('All aggregates refreshed');
}
}
// Usage
const aggregator = new AggregationBuilder();
await aggregator.refreshAllAggregates();
Aggregation benefits:
- Queries return instantly (pre-computed)
- Database load reduced dramatically
- Can power real-time dashboards
- Easier to understand than complex queries
Component 5: Query Interface
Make your warehouse easy to query.
class WarehouseQuery {
async getProfileGrowth(username, platform, days = 30) {
const query = `
SELECT
stat_date,
follower_count,
post_count,
avg_engagement_rate
FROM daily_profile_stats dps
JOIN profiles p ON dps.profile_id = p.profile_id
WHERE p.username = $1
AND p.platform = $2
AND stat_date >= NOW() - INTERVAL '${days} days'
ORDER BY stat_date
`;
const result = await warehouseDB.query(query, [username, platform]);
return result.rows;
}
async getTopPosts(platform, days = 7, limit = 10) {
const query = `
SELECT
p.username,
po.caption,
po.posted_at,
po.likes_count,
po.comments_count,
po.engagement_rate
FROM posts po
JOIN profiles p ON po.profile_id = p.profile_id
WHERE po.platform = $1
AND po.posted_at >= NOW() - INTERVAL '${days} days'
ORDER BY po.engagement_rate DESC
LIMIT $2
`;
const result = await warehouseDB.query(query, [platform, limit]);
return result.rows;
}
async getHashtagTrends(days = 7) {
const query = `
SELECT
h.tag,
COUNT(DISTINCT ph.post_id) as post_count,
AVG(po.engagement_rate) as avg_engagement,
SUM(po.likes_count) as total_likes
FROM hashtags h
JOIN post_hashtags ph ON h.hashtag_id = ph.hashtag_id
JOIN posts po ON ph.post_id = po.post_id
WHERE po.posted_at >= NOW() - INTERVAL '${days} days'
GROUP BY h.tag
HAVING COUNT(DISTINCT ph.post_id) >= 10
ORDER BY post_count DESC
LIMIT 50
`;
const result = await warehouseDB.query(query);
return result.rows;
}
async getProfileComparison(usernames, platform) {
const placeholders = usernames.map((_, i) => `$${i + 2}`).join(', ');
const query = `
SELECT
p.username,
p.follower_count,
p.engagement_rate,
COUNT(DISTINCT po.post_id) as total_posts,
AVG(po.likes_count) as avg_likes,
AVG(po.comments_count) as avg_comments
FROM profiles p
LEFT JOIN posts po ON p.profile_id = po.profile_id
WHERE p.platform = $1
AND p.username IN (${placeholders})
GROUP BY p.profile_id, p.username, p.follower_count, p.engagement_rate
`;
const result = await warehouseDB.query(query, [platform, ...usernames]);
return result.rows;
}
}
// Usage
const query = new WarehouseQuery();
// Get 30-day growth for a profile
const growth = await query.getProfileGrowth('example', 'instagram', 30);
console.log('Growth data:', growth);
// Get top posts from last week
const topPosts = await query.getTopPosts('tiktok', 7, 10);
console.log('Top posts:', topPosts);
// Get trending hashtags
const trends = await query.getHashtagTrends(7);
console.log('Trending hashtags:', trends);
// Compare multiple profiles
const comparison = await query.getProfileComparison(
['user1', 'user2', 'user3'],
'instagram'
);
console.log('Profile comparison:', comparison);
Python Analytics Layer
Use Python for advanced analytics on your warehouse.
import pandas as pd
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
class WarehouseAnalytics:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
def get_growth_trends(self, username, platform, days=90):
query = f"""
SELECT
stat_date,
follower_count,
avg_engagement_rate,
post_count
FROM daily_profile_stats dps
JOIN profiles p ON dps.profile_id = p.profile_id
WHERE p.username = '{username}'
AND p.platform = '{platform}'
AND stat_date >= NOW() - INTERVAL '{days} days'
ORDER BY stat_date
"""
df = pd.read_sql(query, self.engine)
df['stat_date'] = pd.to_datetime(df['stat_date'])
return df
def analyze_posting_patterns(self, profile_id, days=90):
query = f"""
SELECT
EXTRACT(DOW FROM posted_at) as day_of_week,
EXTRACT(HOUR FROM posted_at) as hour_of_day,
AVG(engagement_rate) as avg_engagement,
COUNT(*) as post_count
FROM posts
WHERE profile_id = {profile_id}
AND posted_at >= NOW() - INTERVAL '{days} days'
GROUP BY day_of_week, hour_of_day
ORDER BY day_of_week, hour_of_day
"""
df = pd.read_sql(query, self.engine)
# Find best times
best_times = df.nlargest(5, 'avg_engagement')
return {
'data': df,
'best_times': best_times.to_dict('records')
}
def visualize_growth(self, username, platform, days=90):
df = self.get_growth_trends(username, platform, days)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Follower growth
ax1.plot(df['stat_date'], df['follower_count'], marker='o')
ax1.set_title(f'{username} Follower Growth')
ax1.set_xlabel('Date')
ax1.set_ylabel('Followers')
ax1.grid(True)
# Engagement rate
ax2.plot(df['stat_date'], df['avg_engagement_rate'], marker='o', color='green')
ax2.set_title(f'{username} Engagement Rate')
ax2.set_xlabel('Date')
ax2.set_ylabel('Engagement Rate (%)')
ax2.grid(True)
plt.tight_layout()
plt.savefig(f'{username}_growth_analysis.png')
print(f"Saved visualization to {username}_growth_analysis.png")
# Usage
analytics = WarehouseAnalytics('postgresql://user:pass@warehouse-db:5432/warehouse')
# Get growth trends
trends = analytics.get_growth_trends('example', 'instagram', 90)
print(f"Retrieved {len(trends)} days of data")
# Analyze posting patterns
patterns = analytics.analyze_posting_patterns(123, 90)
print("Best posting times:", patterns['best_times'])
# Create visualization
analytics.visualize_growth('example', 'instagram', 90)
Scheduled ETL Jobs
Run your ETL on a schedule.
const cron = require('node-cron');
class ETLScheduler {
constructor() {
this.jobs = [];
}
scheduleDaily(jobName, hour, minute, jobFunction) {
const cronTime = `${minute} ${hour} * * *`;
const job = cron.schedule(cronTime, async () => {
console.log(`\n=== Running scheduled job: ${jobName} ===`);
const startTime = Date.now();
try {
await jobFunction();
const duration = (Date.now() - startTime) / 1000;
console.log(`Job ${jobName} completed in ${duration}s`);
} catch (error) {
console.error(`Job ${jobName} failed:`, error);
}
});
this.jobs.push({ name: jobName, job, schedule: cronTime });
console.log(`Scheduled ${jobName} to run at ${hour}:${minute} daily`);
}
start() {
console.log(`\nStarting ${this.jobs.length} scheduled jobs...`);
this.jobs.forEach(j => j.job.start());
}
stop() {
this.jobs.forEach(j => j.job.stop());
console.log('All jobs stopped');
}
}
// Usage
const scheduler = new ETLScheduler();
// Run profile sync every day at 2 AM
scheduler.scheduleDaily('Profile Sync', 2, 0, async () => {
const incrementalETL = new IncrementalETL();
await incrementalETL.runIncrementalETL('profile_sync', async (lastCheckpoint) => {
const etl = new ETLPipeline();
return await etl.runProfileETL(lastCheckpoint);
});
});
// Build aggregates every day at 3 AM
scheduler.scheduleDaily('Build Aggregates', 3, 0, async () => {
const aggregator = new AggregationBuilder();
await aggregator.refreshAllAggregates();
});
scheduler.start();
Real-World Results
Before data warehouse:
- Queries took 30-60 seconds
- Could not do historical analysis
- Database kept crashing
- Reports were always outdated
After data warehouse:
- Queries return in under 1 second
- 2 years of historical data available
- Never crashes even with 50M records
- Real-time dashboards always current
My warehouse now handles:
- 50 million posts
- 500 million comments
- 2 years of historical data
- 1000+ queries per day
- Sub-second response times
Your Data Warehouse Action Plan
- Design schema - Separate dimensions and facts
- Build ETL pipeline - Incremental updates only
- Create aggregation tables - Pre-compute common queries
- Schedule jobs - Run ETL daily automatically
- Build query interface - Make data accessible
Get your SociaVault API key and start building your social media data warehouse. We provide the data collection infrastructure so you can focus on analytics.
Stop losing historical data. Start building insights that scale.
Found this helpful?
Share it with others who might benefit
Ready to Try SociaVault?
Start extracting social media data with our powerful API