Chapter 8: System Design Interview Mastery

8.19 Design a metrics/monitoring system (Datadog)

1. Problem Definition and Scope

We are designing a large-scale metrics monitoring platform. This system enables engineers to collect numerical data from their infrastructure and applications, visualize it on dashboards, and receive alerts when things go wrong.

  • User Groups: DevOps Engineers, SREs, and Application Developers.

  • Main Actions: Ingesting metrics (writing), Querying for graphs (reading), and Alerting on thresholds.

  • Scope:

    • We will focus on Numerical Time Series Data (e.g., CPU Usage, Request Count).

    • Out of Scope: We will not cover Distributed Tracing (APM) or centralized Log Aggregation (ELK), as these require different storage engines.

2. Clarify functional requirements

Must Have:

  • Ingestion: Support for pushing metrics via an API or Agent with tags (e.g., host:web-01, env:prod).

  • Querying: Users can fetch aggregated data over time ranges (e.g., "Avg CPU last 1 hour").

  • Visualization: Return data formatted for line charts and gauges.

  • Alerting: Users can define rules (e.g., "If error rate > 5% for 2 mins") and receive notifications.

  • Tagging: Data must be multi-dimensional (query by any tag).

Nice to Have:

  • Downsampling: Automatically reducing resolution of older data (e.g., 10-second data becomes 1-minute averages after a week) to save massive storage space.

  • Multi-tenancy: Logical isolation between different teams or customers.

Functional Requirements

3. Clarify non-functional requirements

  • Scale: Large Enterprise.
    • 100,000 Monitored Hosts.
    • Write Heavy: The system is 99% writes (servers report constantly) and 1% reads (humans check occasionally).
  • Latency:
    • Writes: Fast acknowledgment (< 50ms) to the agent.
    • Reads: Dashboard loads within 500ms.
    • Alerting: Alerts should trigger within ~1 minute of the event.
  • Availability: High (99.9%). Monitoring must be up even if the monitored app is down.
  • Consistency: Eventual consistency is acceptable. It is okay if a graph is 5 seconds behind real-time.
  • Data Retention:
    • Raw data (10s): 7 days.
    • Medium resolution (1m): 30 days.
    • Low resolution (1h): 1 year.

Non-Functional Requirements

4. Back of the envelope estimates

  • Traffic Estimation:
    • Hosts: 100,000.
    • Metrics per host: 100 active metrics.
    • Reporting Interval: Every 10 seconds.
    • Total Active Series: $100,000 \times 100 = 10 \text{ Million series}$.
    • Write QPS: $10,000,000 / 10 = \mathbf{1 \text{ Million writes/sec}}$.
  • Storage Estimation:
    • Raw Data Point: Timestamp (8 bytes) + Value (8 bytes) = 16 bytes.
    • Compression: TSDBs use "Gorilla" compression (delta-of-delta), shrinking a point to ~2 bytes.
    • Ingest Rate: $1M \text{ points/sec} \times 2 \text{ bytes} = 2 \text{ MB/sec}$.
    • Per Day: $2 \text{ MB} \times 86,400 \approx \mathbf{172 \text{ GB/day}}$.
    • Per Year (Raw): $\approx 62 \text{ TB}$.
    • Note: Including replication (x3) and indexes, total cluster storage will be ~200 TB to 300 TB.

Back-of-the-envelope estimation

5. API design

We will use a REST API for the ingestion and control plane.

1. Ingest Metrics

  • POST /v1/metrics
  • Body:
    {  
      "series": \[  
        {  
          "metric": "system.cpu.idle",  
          "points": \[\[16700000, 95.5\]\],  
          "tags": \["host:web-01", "region:us-east"\],  
          "type": "gauge"  
        }  
      \]  
    }
  • Response: 202 Accepted (Indicates async processing).

2. Query Metrics

  • GET /v1/query
  • Parameters:
    • from: Timestamp.
    • to: Timestamp.
    • query: avg:system.cpu.idle{region:us-east} by {host}.
  • Response: JSON object containing arrays of [time, value] pairs.

3. Create Alert Rule

  • POST /v1/alerts
  • Body: {"query": "avg(last_5m):cpu > 90", "message": "High CPU", "channel": "pagerduty"}

6. High level architecture

We need a decoupled architecture to handle 1 million writes per second without blocking.

Agent -> Load Balancer -> Metrics Gateway -> Kafka -> Stream Processor -> TSDB

  • Client Agent: Runs on the user's server. Collects metrics locally and flushes them in batches every 10s.

  • Metrics Gateway: Stateless API servers. They validate API keys and sanitize data. They do not write to the DB directly; they push to Kafka.

  • Kafka (Message Queue): The buffer. It handles the 1M events/sec. Partitioned by MetricID.

  • Stream Processor (Consumer): Reads from Kafka. It handles "Alert Evaluation" (checking rules in real-time) and writing to storage.

  • Time Series Database (TSDB): Specialized storage optimized for time-series data.

  • Query Service: APIs that fetch data from the TSDB for dashboards.

High-level Architecture

7. Data model

Relational databases (MySQL) struggle with this write volume. We need a Time Series Database (TSDB) model.

1. Metadata Index (Inverted Index):

  • We need to find "Which Series IDs match the tag env:prod?".
  • Structure:
    • Key: Tag (env:prod)
    • Value: List[SeriesID_1, SeriesID_2, ...]
  • Why: This allows fast lookups for queries like "Show me CPU for all prod servers".

2. Time Series Data Store:

  • Stores the actual timestamp-value pairs.
  • Structure:
    • Row Key: SeriesID + TimeBucket (e.g., cpu:web01 + 2023-12-01:10am)
    • Value: Compressed Blob (Using Gorilla/XOR compression).
  • Why: Compressing 1 hour of points into one row reduces disk I/O significantly.

8. Core flows end to end

In a system like Datadog or Prometheus, the challenges are very specific: Writes are massive (1M/sec), but Reads are rare. If you try to write directly to a database, the system will crash.

Here is a simple, detailed explanation of the three flows, using analogies to make them stick.

Flow 1: Metric Ingestion (The Write Path)

The goal: Catch the water coming from a firehose without spilling a drop.

This flow explains how we handle 1 million data points per second. We cannot write to the hard drive 1 million times a second because that is too slow. We need a buffer.

  1. Collection (The Agent):
    Imagine a small worker program installed on every single server (Host A, Host B, etc.). This is the Agent. It doesn't send data instantly. It collects data for 10 seconds (e.g., CPU usage) and bundles it into a package (a batch).
    • Why? Sending one big package is much more efficient than sending 100 tiny envelopes.
  2. Gateway Reception (The Bouncer):
    The batch arrives at the Metrics Gateway (an API server). The Gateway checks the ID (API Key) to make sure the user is allowed in.
    • Crucial Step: The Gateway does not write to the database. It only does one thing: it hands the package to Kafka.
  3. Buffering (Kafka - The Shock Absorber):
    Kafka is a message queue. Think of it as a massive, high-speed conveyor belt.
    • The "Ack": As soon as the Gateway puts the data on the Kafka conveyor belt, it tells the Agent: "Success (202 Accepted)! I got it."
    • Why this matters: The Agent is happy and goes back to work. It doesn't care that the data hasn't been saved to the database yet. This makes the API extremely fast (< 50ms).
  4. Persistence (The Writer):
    Now, a separate worker called the TSDB Writer picks the data off the conveyor belt at its own pace. It organizes the data and saves it to the Time Series Database (TSDB) on the disk.

Flow 1

Flow 2: Dashboard Query (The Read Path)

The goal: Find a needle in a haystack, instantly.

When a user opens a dashboard to see "Average CPU for all Production Servers," they are asking a complex question. The database doesn't know which servers are "Production" just by looking at the raw data. It needs a map.

  1. The Request:
    The user asks: "Give me the CPU usage for env:prod for the last hour."

  2. Metadata Lookup (The Phonebook):
    The database cannot scan 100,000 servers to see which ones are tagged "prod." That would take forever.
    Instead, it goes to an Inverted Index (think of the index at the back of a textbook).

    • It looks up env:prod.
    • The Index replies: "The servers you want are Series ID 101, 102, and 105."
  3. Data Fetch:
    Now the system knows exactly which IDs to look for. It goes to the TSDB Storage and grabs the data blocks specifically for IDs 101, 102, and 105 for the last hour.

  4. Aggregation (The Math):
    The system unzips (decompresses) the data and does the math. It calculates the average of those three servers combined and sends the final result to the dashboard to draw the line chart.

Flow 2

Flow 3: Real-Time Alerting (The Async Path)

The goal: Tap the river, don't check the lake.

This is the most misunderstood flow. Junior engineers often design alerts by querying the database every minute (e.g., SELECT * FROM db WHERE cpu > 90). This is wrong.

If you run complex queries for 100,000 servers every minute, you will destroy your database.

Instead, we use Stream Processing.

  1. Tapping the Stream:
    Remember the Kafka conveyor belt from Flow 1? We add a second worker to watch that same belt. This is the Alert Processor.

  2. Windowing (Short-term Memory):
    The Alert Processor holds a small amount of data in RAM, usually a "sliding window" (e.g., the last 5 minutes of data). It doesn't touch the hard drive.

  3. Evaluation:
    As a new data point flies by on the conveyor belt (Kafka), the processor instantly checks it against the rules in memory.

    • Rule: "Is CPU > 90%?"
    • Data: "CPU is 95%."
    • Result: TRIGGER!
  4. Notification:
    Because this happens in memory on the stream, it is instant. The processor sends a message to PagerDuty or Slack immediately. The database hasn't even finished writing the file yet, but the engineer is already being paged.

Flow 3

9. Caching and read performance

  • Metadata Cache: The mapping of Tags -> SeriesIDs changes rarely (only when new hosts are added). We cache this in Redis. This avoids hitting the expensive Inverted Index for every query.

  • Query Result Cache: If a user asks for "Last 7 days", that historical data doesn't change. We cache the calculated result. If they refresh the page, we serve from cache.

  • Time-Aligned Cache: We cache "Aligned Time Blocks". For example, the data block from 10:00 to 11:00 is immutable once the hour is over. We cache this block so future queries don't need to re-read it from disk.

10. Storage, indexing and media

  • Compression (Gorilla/XOR):
    • Metrics are repetitive (e.g., 100, 100, 100).
    • We store the difference (delta) or XOR pattern between values.
    • This reduces storage from 16 bytes/point to ~2 bytes/point, saving TBs of space.
  • Downsampling (Rollups):
    • Querying 1 year of raw data (10s intervals) requires reading 3 million points. This is too slow.
    • Compactor Service: A background job reads raw data and generates 1-hour aggregated blocks.
    • When a user queries "Last 1 Year", the API automatically routes the query to the 1-hour table, reading only ~8,000 points.

11. Scaling strategies

  • Sharding:
    • We shard the TSDB and Kafka partitions by SeriesID.
    • This ensures all data for cpu:web-01 lands on the same shard, making compression efficient and queries fast.
  • Write Path Scaling:
    • The Gateway and Kafka scale horizontally. If load increases to 2M QPS, we simply add more Gateway nodes and Kafka brokers.
  • Tenant Isolation:
    • If one large customer floods the system, we can route their tenant_id to a dedicated Kafka topic and DB cluster so they don't degrade performance for others.

12. Reliability, failure handling and backpressure

  • Data Loss Prevention:
    • Kafka acts as a persistent buffer (retention 24h). If the TSDB crashes, data piles up in Kafka. Once the DB is fixed, the consumers catch up.
    • WAL (Write Ahead Log): Before acknowledging a write in the DB memory, we write it to a sequential disk log to survive power failures.
  • Replication:
    • TSDB uses Replication Factor of 3. We can lose 1 node without losing data.
  • Rate Limiting:
    • We implement rate limits per API Key. If a buggy agent sends 100x traffic, we return 429 Too Many Requests to protect the system.

13. Security, privacy and abuse

  • Authentication: Agents use API Keys. We validate these against a fast in-memory cache at the Gateway.

  • Encryption: TLS (HTTPS) for all transit.

  • Cardinality Abuse:

    • Risk: A user adds a random tag like uuid to a metric (e.g., tag:request_id=123). This creates millions of unique Series IDs, bloating the index ("Cardinality Explosion").
    • Defense: We implement limits. If a metric has > 1000 unique tag combinations, we drop new tags or blacklist the metric.

14. Bottlenecks and next steps

  • Bottleneck: High Cardinality. The Inverted Index is the most fragile part of a TSDB.

    • Next Step: Use Bloom Filters at the Gateway to quickly reject known "bad" high-cardinality tags before they hit the system.
  • Bottleneck: Long-term Storage Cost.

    • Next Step: Implement Tiered Storage. Keep "Hot" data (last 2 days) on SSD, and move "Cold" data (older than 30 days) to S3/Object Storage in Parquet format to save 80% on cost.
  • Bottleneck: Alerting Lag.

    • Next Step: Create a "Fast Lane" Kafka topic for critical alerts (e.g., cpu > 99), ensuring they are processed before bulk logging metrics.

Summary

  1. Massive Writes: Handled 1M QPS using Kafka buffers and a specialized TSDB with LSM-trees.

  2. Decoupling: Separated Ingestion, Query, and Alerting into independent paths to prevent read/write contention.

  3. Efficiency: Used Gorilla compression and Downsampling (Rollups) to make storage costs feasible.

  4. Protection: Secured the system against Cardinality Abuse using limits and Bloom Filters.