Back to Blog
Technical

Build a Social Media Data Warehouse: Complete Architecture Guide

November 5, 2025
13 min read
By SociaVault Team
Data WarehouseDatabaseArchitectureAnalyticsScalability

Build a Social Media Data Warehouse: Complete Architecture Guide

You scrape data from Instagram, TikTok, Twitter. But where does it go?

Right now it probably goes into one messy database. Posts mixed with comments mixed with profiles mixed with analytics. Queries get slower every day. Finding anything takes forever.

I built my first data warehouse wrong. It worked for 10,000 records. At 100,000 records queries took 30 seconds. At 1 million records everything crashed.

I rebuilt it properly. Now I store 50 million records. Queries return in under 1 second. Historical analysis goes back 2 years. Everything scales smoothly.

Let me show you how to build a social media data warehouse that actually works at scale.

Why You Need a Data Warehouse

A regular database and a data warehouse are different tools for different jobs.

Regular database: Fast reads and writes for current data. Your app uses this.

Data warehouse: Historical data, complex queries, analytics. Your reports use this.

When to Build a Warehouse

You need a data warehouse when:

  • You want historical analysis (trends over months/years)
  • Your analytics queries slow down your app
  • You need to join data from multiple platforms
  • You want to run complex aggregations
  • Your database is over 100 MB

I started building my warehouse at 50,000 records. Best decision I made.

Architecture Overview

A good data warehouse has these components:

Source databases: Your app databases (PostgreSQL, MongoDB, etc) ETL pipeline: Extract, Transform, Load data into warehouse Data warehouse: Optimized for analytics (PostgreSQL, ClickHouse, BigQuery) Analytics layer: Query interface and dashboards

Let me show you how to build each piece.

Component 1: Schema Design

Design your warehouse schema for analytics, not transactions.

-- Profiles table (slowly changing dimension)
CREATE TABLE profiles (
  profile_id BIGSERIAL PRIMARY KEY,
  platform VARCHAR(20) NOT NULL,
  username VARCHAR(255) NOT NULL,
  full_name VARCHAR(255),
  follower_count INTEGER,
  following_count INTEGER,
  post_count INTEGER,
  engagement_rate DECIMAL(5,2),
  category VARCHAR(100),
  first_seen_at TIMESTAMP NOT NULL,
  last_updated_at TIMESTAMP NOT NULL,
  is_current BOOLEAN DEFAULT true,
  UNIQUE(platform, username)
);

CREATE INDEX idx_profiles_platform ON profiles(platform);
CREATE INDEX idx_profiles_updated ON profiles(last_updated_at);
CREATE INDEX idx_profiles_engagement ON profiles(engagement_rate);

-- Posts table (fact table)
CREATE TABLE posts (
  post_id BIGSERIAL PRIMARY KEY,
  profile_id BIGINT REFERENCES profiles(profile_id),
  platform VARCHAR(20) NOT NULL,
  platform_post_id VARCHAR(255) NOT NULL,
  post_type VARCHAR(50),
  caption TEXT,
  media_type VARCHAR(50),
  posted_at TIMESTAMP NOT NULL,
  scraped_at TIMESTAMP NOT NULL,
  likes_count INTEGER DEFAULT 0,
  comments_count INTEGER DEFAULT 0,
  shares_count INTEGER DEFAULT 0,
  views_count BIGINT DEFAULT 0,
  engagement_rate DECIMAL(5,2),
  UNIQUE(platform, platform_post_id)
);

CREATE INDEX idx_posts_profile ON posts(profile_id);
CREATE INDEX idx_posts_posted_at ON posts(posted_at);
CREATE INDEX idx_posts_platform ON posts(platform);
CREATE INDEX idx_posts_engagement ON posts(engagement_rate);

-- Post metrics (time series data)
CREATE TABLE post_metrics (
  metric_id BIGSERIAL PRIMARY KEY,
  post_id BIGINT REFERENCES posts(post_id),
  recorded_at TIMESTAMP NOT NULL,
  likes_count INTEGER,
  comments_count INTEGER,
  shares_count INTEGER,
  views_count BIGINT,
  engagement_rate DECIMAL(5,2)
);

CREATE INDEX idx_metrics_post ON post_metrics(post_id);
CREATE INDEX idx_metrics_recorded ON post_metrics(recorded_at);

-- Comments table (fact table)
CREATE TABLE comments (
  comment_id BIGSERIAL PRIMARY KEY,
  post_id BIGINT REFERENCES posts(post_id),
  platform VARCHAR(20) NOT NULL,
  platform_comment_id VARCHAR(255) NOT NULL,
  author_username VARCHAR(255),
  text TEXT,
  posted_at TIMESTAMP NOT NULL,
  scraped_at TIMESTAMP NOT NULL,
  likes_count INTEGER DEFAULT 0,
  sentiment_score DECIMAL(3,2),
  UNIQUE(platform, platform_comment_id)
);

CREATE INDEX idx_comments_post ON comments(post_id);
CREATE INDEX idx_comments_posted_at ON comments(posted_at);
CREATE INDEX idx_comments_sentiment ON comments(sentiment_score);

-- Hashtags table (dimension)
CREATE TABLE hashtags (
  hashtag_id SERIAL PRIMARY KEY,
  tag VARCHAR(255) NOT NULL UNIQUE,
  first_seen_at TIMESTAMP NOT NULL
);

-- Post hashtags junction
CREATE TABLE post_hashtags (
  post_id BIGINT REFERENCES posts(post_id),
  hashtag_id INTEGER REFERENCES hashtags(hashtag_id),
  PRIMARY KEY(post_id, hashtag_id)
);

CREATE INDEX idx_post_hashtags_tag ON post_hashtags(hashtag_id);

-- Daily aggregates (for faster reporting)
CREATE TABLE daily_profile_stats (
  stat_id BIGSERIAL PRIMARY KEY,
  profile_id BIGINT REFERENCES profiles(profile_id),
  stat_date DATE NOT NULL,
  follower_count INTEGER,
  following_count INTEGER,
  post_count INTEGER,
  total_likes INTEGER,
  total_comments INTEGER,
  avg_engagement_rate DECIMAL(5,2),
  UNIQUE(profile_id, stat_date)
);

CREATE INDEX idx_daily_stats_profile ON daily_profile_stats(profile_id);
CREATE INDEX idx_daily_stats_date ON daily_profile_stats(stat_date);

Why this schema works:

  • Profiles are dimensions (attributes that change slowly)
  • Posts and comments are facts (events that happened)
  • Time series metrics track changes over time
  • Daily aggregates speed up common queries
  • Indexes on key columns for fast lookups

Component 2: ETL Pipeline

Extract data from source, transform it, load into warehouse.

const { Pool } = require('pg');

const sourceDB = new Pool({
  host: 'source-db.example.com',
  database: 'production',
  user: 'readonly',
  password: process.env.SOURCE_DB_PASSWORD
});

const warehouseDB = new Pool({
  host: 'warehouse-db.example.com',
  database: 'warehouse',
  user: 'etl',
  password: process.env.WAREHOUSE_DB_PASSWORD
});

class ETLPipeline {
  constructor(batchSize = 1000) {
    this.batchSize = batchSize;
  }
  
  async extractProfiles(lastExtractTime) {
    // Extract profiles updated since last run
    const query = `
      SELECT 
        id, platform, username, full_name,
        follower_count, following_count, post_count,
        engagement_rate, category, created_at, updated_at
      FROM profiles
      WHERE updated_at > $1
      ORDER BY updated_at
    `;
    
    const result = await sourceDB.query(query, [lastExtractTime]);
    console.log(`Extracted ${result.rows.length} profiles`);
    return result.rows;
  }
  
  transformProfile(sourceProfile) {
    // Transform source format to warehouse format
    return {
      platform: sourceProfile.platform,
      username: sourceProfile.username,
      full_name: sourceProfile.full_name,
      follower_count: sourceProfile.follower_count,
      following_count: sourceProfile.following_count,
      post_count: sourceProfile.post_count,
      engagement_rate: sourceProfile.engagement_rate,
      category: sourceProfile.category,
      first_seen_at: sourceProfile.created_at,
      last_updated_at: sourceProfile.updated_at
    };
  }
  
  async loadProfiles(profiles) {
    // Batch insert profiles into warehouse
    for (let i = 0; i < profiles.length; i += this.batchSize) {
      const batch = profiles.slice(i, i + this.batchSize);
      
      const values = [];
      const placeholders = [];
      
      batch.forEach((profile, index) => {
        const offset = index * 10;
        placeholders.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10})`);
        
        values.push(
          profile.platform,
          profile.username,
          profile.full_name,
          profile.follower_count,
          profile.following_count,
          profile.post_count,
          profile.engagement_rate,
          profile.category,
          profile.first_seen_at,
          profile.last_updated_at
        );
      });
      
      const query = `
        INSERT INTO profiles (
          platform, username, full_name, follower_count,
          following_count, post_count, engagement_rate,
          category, first_seen_at, last_updated_at
        )
        VALUES ${placeholders.join(', ')}
        ON CONFLICT (platform, username) DO UPDATE SET
          full_name = EXCLUDED.full_name,
          follower_count = EXCLUDED.follower_count,
          following_count = EXCLUDED.following_count,
          post_count = EXCLUDED.post_count,
          engagement_rate = EXCLUDED.engagement_rate,
          category = EXCLUDED.category,
          last_updated_at = EXCLUDED.last_updated_at
      `;
      
      await warehouseDB.query(query, values);
      console.log(`Loaded batch ${i / this.batchSize + 1} (${batch.length} profiles)`);
    }
  }
  
  async runProfileETL(lastExtractTime) {
    console.log('Starting profile ETL...');
    const startTime = Date.now();
    
    // Extract
    const sourceProfiles = await this.extractProfiles(lastExtractTime);
    
    // Transform
    const transformedProfiles = sourceProfiles.map(p => this.transformProfile(p));
    
    // Load
    await this.loadProfiles(transformedProfiles);
    
    const duration = (Date.now() - startTime) / 1000;
    console.log(`ETL completed in ${duration}s`);
    
    return {
      recordsProcessed: sourceProfiles.length,
      duration
    };
  }
}

// Usage
const etl = new ETLPipeline(1000);

// Run ETL for data since yesterday
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);

const result = await etl.runProfileETL(yesterday);
console.log('ETL result:', result);

ETL best practices:

  • Run incrementally (only new/changed data)
  • Process in batches (1000 records at a time)
  • Use upserts to handle duplicates
  • Log everything for debugging
  • Handle failures gracefully

Component 3: Incremental Updates

Don't reload all data every time. Just load what changed.

class IncrementalETL {
  constructor() {
    this.checkpointTable = 'etl_checkpoints';
  }
  
  async getLastCheckpoint(jobName) {
    const query = `
      SELECT last_extract_time
      FROM ${this.checkpointTable}
      WHERE job_name = $1
    `;
    
    const result = await warehouseDB.query(query, [jobName]);
    
    if (result.rows.length > 0) {
      return result.rows[0].last_extract_time;
    }
    
    // First run, return date far in past
    return new Date('2020-01-01');
  }
  
  async updateCheckpoint(jobName, extractTime) {
    const query = `
      INSERT INTO ${this.checkpointTable} (job_name, last_extract_time, updated_at)
      VALUES ($1, $2, NOW())
      ON CONFLICT (job_name) DO UPDATE SET
        last_extract_time = EXCLUDED.last_extract_time,
        updated_at = NOW()
    `;
    
    await warehouseDB.query(query, [jobName, extractTime]);
  }
  
  async runIncrementalETL(jobName, etlFunction) {
    console.log(`Running incremental ETL: ${jobName}`);
    
    const lastCheckpoint = await this.getLastCheckpoint(jobName);
    console.log(`Last checkpoint: ${lastCheckpoint}`);
    
    const currentTime = new Date();
    
    try {
      const result = await etlFunction(lastCheckpoint);
      
      await this.updateCheckpoint(jobName, currentTime);
      
      console.log(`ETL ${jobName} completed successfully`);
      return result;
      
    } catch (error) {
      console.error(`ETL ${jobName} failed:`, error);
      throw error;
    }
  }
}

// Usage
const incrementalETL = new IncrementalETL();

// Run profile ETL incrementally
await incrementalETL.runIncrementalETL(
  'profile_sync',
  async (lastCheckpoint) => {
    const etl = new ETLPipeline();
    return await etl.runProfileETL(lastCheckpoint);
  }
);

Why incremental works:

  • Only processes new data (much faster)
  • Checkpoints track progress
  • Can resume after failures
  • Scales to billions of records

Component 4: Aggregation Tables

Pre-compute common queries for instant results.

class AggregationBuilder {
  async buildDailyProfileStats(date) {
    console.log(`Building daily profile stats for ${date}`);
    
    const query = `
      INSERT INTO daily_profile_stats (
        profile_id, stat_date, follower_count, following_count,
        post_count, total_likes, total_comments, avg_engagement_rate
      )
      SELECT 
        p.profile_id,
        $1::date as stat_date,
        p.follower_count,
        p.following_count,
        COUNT(DISTINCT po.post_id) as post_count,
        SUM(po.likes_count) as total_likes,
        SUM(po.comments_count) as total_comments,
        AVG(po.engagement_rate) as avg_engagement_rate
      FROM profiles p
      LEFT JOIN posts po ON p.profile_id = po.profile_id
        AND po.posted_at::date = $1::date
      GROUP BY p.profile_id, p.follower_count, p.following_count
      ON CONFLICT (profile_id, stat_date) DO UPDATE SET
        follower_count = EXCLUDED.follower_count,
        following_count = EXCLUDED.following_count,
        post_count = EXCLUDED.post_count,
        total_likes = EXCLUDED.total_likes,
        total_comments = EXCLUDED.total_comments,
        avg_engagement_rate = EXCLUDED.avg_engagement_rate
    `;
    
    const result = await warehouseDB.query(query, [date]);
    console.log(`Built stats for ${result.rowCount} profiles`);
    
    return result.rowCount;
  }
  
  async buildWeeklyTrends() {
    console.log('Building weekly trend aggregates...');
    
    const query = `
      CREATE TEMP TABLE weekly_trends AS
      SELECT 
        profile_id,
        DATE_TRUNC('week', stat_date) as week_start,
        AVG(follower_count) as avg_followers,
        SUM(post_count) as total_posts,
        AVG(avg_engagement_rate) as avg_engagement,
        (MAX(follower_count) - MIN(follower_count)) as follower_growth
      FROM daily_profile_stats
      WHERE stat_date >= NOW() - INTERVAL '8 weeks'
      GROUP BY profile_id, DATE_TRUNC('week', stat_date)
    `;
    
    await warehouseDB.query(query);
    console.log('Weekly trends computed');
  }
  
  async refreshAllAggregates() {
    const today = new Date();
    today.setHours(0, 0, 0, 0);
    
    // Build yesterday's stats (today's not complete yet)
    const yesterday = new Date(today);
    yesterday.setDate(yesterday.getDate() - 1);
    
    await this.buildDailyProfileStats(yesterday.toISOString().split('T')[0]);
    await this.buildWeeklyTrends();
    
    console.log('All aggregates refreshed');
  }
}

// Usage
const aggregator = new AggregationBuilder();
await aggregator.refreshAllAggregates();

Aggregation benefits:

  • Queries return instantly (pre-computed)
  • Database load reduced dramatically
  • Can power real-time dashboards
  • Easier to understand than complex queries

Component 5: Query Interface

Make your warehouse easy to query.

class WarehouseQuery {
  async getProfileGrowth(username, platform, days = 30) {
    const query = `
      SELECT 
        stat_date,
        follower_count,
        post_count,
        avg_engagement_rate
      FROM daily_profile_stats dps
      JOIN profiles p ON dps.profile_id = p.profile_id
      WHERE p.username = $1 
        AND p.platform = $2
        AND stat_date >= NOW() - INTERVAL '${days} days'
      ORDER BY stat_date
    `;
    
    const result = await warehouseDB.query(query, [username, platform]);
    return result.rows;
  }
  
  async getTopPosts(platform, days = 7, limit = 10) {
    const query = `
      SELECT 
        p.username,
        po.caption,
        po.posted_at,
        po.likes_count,
        po.comments_count,
        po.engagement_rate
      FROM posts po
      JOIN profiles p ON po.profile_id = p.profile_id
      WHERE po.platform = $1
        AND po.posted_at >= NOW() - INTERVAL '${days} days'
      ORDER BY po.engagement_rate DESC
      LIMIT $2
    `;
    
    const result = await warehouseDB.query(query, [platform, limit]);
    return result.rows;
  }
  
  async getHashtagTrends(days = 7) {
    const query = `
      SELECT 
        h.tag,
        COUNT(DISTINCT ph.post_id) as post_count,
        AVG(po.engagement_rate) as avg_engagement,
        SUM(po.likes_count) as total_likes
      FROM hashtags h
      JOIN post_hashtags ph ON h.hashtag_id = ph.hashtag_id
      JOIN posts po ON ph.post_id = po.post_id
      WHERE po.posted_at >= NOW() - INTERVAL '${days} days'
      GROUP BY h.tag
      HAVING COUNT(DISTINCT ph.post_id) >= 10
      ORDER BY post_count DESC
      LIMIT 50
    `;
    
    const result = await warehouseDB.query(query);
    return result.rows;
  }
  
  async getProfileComparison(usernames, platform) {
    const placeholders = usernames.map((_, i) => `$${i + 2}`).join(', ');
    
    const query = `
      SELECT 
        p.username,
        p.follower_count,
        p.engagement_rate,
        COUNT(DISTINCT po.post_id) as total_posts,
        AVG(po.likes_count) as avg_likes,
        AVG(po.comments_count) as avg_comments
      FROM profiles p
      LEFT JOIN posts po ON p.profile_id = po.profile_id
      WHERE p.platform = $1
        AND p.username IN (${placeholders})
      GROUP BY p.profile_id, p.username, p.follower_count, p.engagement_rate
    `;
    
    const result = await warehouseDB.query(query, [platform, ...usernames]);
    return result.rows;
  }
}

// Usage
const query = new WarehouseQuery();

// Get 30-day growth for a profile
const growth = await query.getProfileGrowth('example', 'instagram', 30);
console.log('Growth data:', growth);

// Get top posts from last week
const topPosts = await query.getTopPosts('tiktok', 7, 10);
console.log('Top posts:', topPosts);

// Get trending hashtags
const trends = await query.getHashtagTrends(7);
console.log('Trending hashtags:', trends);

// Compare multiple profiles
const comparison = await query.getProfileComparison(
  ['user1', 'user2', 'user3'],
  'instagram'
);
console.log('Profile comparison:', comparison);

Python Analytics Layer

Use Python for advanced analytics on your warehouse.

import pandas as pd
from sqlalchemy import create_engine
import matplotlib.pyplot as plt

class WarehouseAnalytics:
    def __init__(self, connection_string):
        self.engine = create_engine(connection_string)
    
    def get_growth_trends(self, username, platform, days=90):
        query = f"""
        SELECT 
            stat_date,
            follower_count,
            avg_engagement_rate,
            post_count
        FROM daily_profile_stats dps
        JOIN profiles p ON dps.profile_id = p.profile_id
        WHERE p.username = '{username}' 
          AND p.platform = '{platform}'
          AND stat_date >= NOW() - INTERVAL '{days} days'
        ORDER BY stat_date
        """
        
        df = pd.read_sql(query, self.engine)
        df['stat_date'] = pd.to_datetime(df['stat_date'])
        
        return df
    
    def analyze_posting_patterns(self, profile_id, days=90):
        query = f"""
        SELECT 
            EXTRACT(DOW FROM posted_at) as day_of_week,
            EXTRACT(HOUR FROM posted_at) as hour_of_day,
            AVG(engagement_rate) as avg_engagement,
            COUNT(*) as post_count
        FROM posts
        WHERE profile_id = {profile_id}
          AND posted_at >= NOW() - INTERVAL '{days} days'
        GROUP BY day_of_week, hour_of_day
        ORDER BY day_of_week, hour_of_day
        """
        
        df = pd.read_sql(query, self.engine)
        
        # Find best times
        best_times = df.nlargest(5, 'avg_engagement')
        
        return {
            'data': df,
            'best_times': best_times.to_dict('records')
        }
    
    def visualize_growth(self, username, platform, days=90):
        df = self.get_growth_trends(username, platform, days)
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
        
        # Follower growth
        ax1.plot(df['stat_date'], df['follower_count'], marker='o')
        ax1.set_title(f'{username} Follower Growth')
        ax1.set_xlabel('Date')
        ax1.set_ylabel('Followers')
        ax1.grid(True)
        
        # Engagement rate
        ax2.plot(df['stat_date'], df['avg_engagement_rate'], marker='o', color='green')
        ax2.set_title(f'{username} Engagement Rate')
        ax2.set_xlabel('Date')
        ax2.set_ylabel('Engagement Rate (%)')
        ax2.grid(True)
        
        plt.tight_layout()
        plt.savefig(f'{username}_growth_analysis.png')
        print(f"Saved visualization to {username}_growth_analysis.png")

# Usage
analytics = WarehouseAnalytics('postgresql://user:pass@warehouse-db:5432/warehouse')

# Get growth trends
trends = analytics.get_growth_trends('example', 'instagram', 90)
print(f"Retrieved {len(trends)} days of data")

# Analyze posting patterns
patterns = analytics.analyze_posting_patterns(123, 90)
print("Best posting times:", patterns['best_times'])

# Create visualization
analytics.visualize_growth('example', 'instagram', 90)

Scheduled ETL Jobs

Run your ETL on a schedule.

const cron = require('node-cron');

class ETLScheduler {
  constructor() {
    this.jobs = [];
  }
  
  scheduleDaily(jobName, hour, minute, jobFunction) {
    const cronTime = `${minute} ${hour} * * *`;
    
    const job = cron.schedule(cronTime, async () => {
      console.log(`\n=== Running scheduled job: ${jobName} ===`);
      const startTime = Date.now();
      
      try {
        await jobFunction();
        const duration = (Date.now() - startTime) / 1000;
        console.log(`Job ${jobName} completed in ${duration}s`);
      } catch (error) {
        console.error(`Job ${jobName} failed:`, error);
      }
    });
    
    this.jobs.push({ name: jobName, job, schedule: cronTime });
    console.log(`Scheduled ${jobName} to run at ${hour}:${minute} daily`);
  }
  
  start() {
    console.log(`\nStarting ${this.jobs.length} scheduled jobs...`);
    this.jobs.forEach(j => j.job.start());
  }
  
  stop() {
    this.jobs.forEach(j => j.job.stop());
    console.log('All jobs stopped');
  }
}

// Usage
const scheduler = new ETLScheduler();

// Run profile sync every day at 2 AM
scheduler.scheduleDaily('Profile Sync', 2, 0, async () => {
  const incrementalETL = new IncrementalETL();
  await incrementalETL.runIncrementalETL('profile_sync', async (lastCheckpoint) => {
    const etl = new ETLPipeline();
    return await etl.runProfileETL(lastCheckpoint);
  });
});

// Build aggregates every day at 3 AM
scheduler.scheduleDaily('Build Aggregates', 3, 0, async () => {
  const aggregator = new AggregationBuilder();
  await aggregator.refreshAllAggregates();
});

scheduler.start();

Real-World Results

Before data warehouse:

  • Queries took 30-60 seconds
  • Could not do historical analysis
  • Database kept crashing
  • Reports were always outdated

After data warehouse:

  • Queries return in under 1 second
  • 2 years of historical data available
  • Never crashes even with 50M records
  • Real-time dashboards always current

My warehouse now handles:

  • 50 million posts
  • 500 million comments
  • 2 years of historical data
  • 1000+ queries per day
  • Sub-second response times

Your Data Warehouse Action Plan

  1. Design schema - Separate dimensions and facts
  2. Build ETL pipeline - Incremental updates only
  3. Create aggregation tables - Pre-compute common queries
  4. Schedule jobs - Run ETL daily automatically
  5. Build query interface - Make data accessible

Get your SociaVault API key and start building your social media data warehouse. We provide the data collection infrastructure so you can focus on analytics.

Stop losing historical data. Start building insights that scale.

Found this helpful?

Share it with others who might benefit

Ready to Try SociaVault?

Start extracting social media data with our powerful API