Back to Skills

Data Incident Response

majiayu000
Updated Today
4 views
58
9
58
View on GitHub
Otheraidata

About

This skill provides structured playbooks for responding to data incidents like pipeline failures, data corruption, or loss. It helps developers quickly triage, contain, and resolve issues to minimize impact on analytics and business decisions. Key features include severity classification, clear response procedures, and a focus on prevention.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/majiayu000/claude-skill-registry
Git CloneAlternative
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/Data Incident Response

Copy and paste this command in Claude Code to install this skill

Documentation

Data Incident Response

Overview

Data Incident Response is the process of detecting, triaging, and resolving issues related to data quality, availability, or integrity. Unlike application incidents, data incidents can have long-lasting impacts on analytics, ML models, and business decisions.

Core Principle: "Bad data is worse than no data. Detect fast, respond faster, prevent recurrence."


1. Types of Data Incidents

TypeDescriptionExampleSeverity
Data LossData deleted or not capturedAccidental DROP TABLEP0
Data CorruptionData modified incorrectlyETL bug multiplies prices by 100P0-P1
Data BreachUnauthorized data accessPII exposed in logsP0
Pipeline FailureETL/ELT pipeline stopsAirflow DAG failsP1-P2
Schema Breaking ChangeUpstream schema change breaks pipelineColumn renamed without noticeP1
Data Quality DegradationIncreasing nulls, duplicates, anomalies20% of orders have null customer_idP2
Freshness ViolationData not updated within SLADashboard showing yesterday's dataP2-P3

2. Data Incident Severity Levels

P0 (Critical)

  • Definition: Data breach, major data loss, or corruption affecting production decisions
  • Examples:
    • PII exposed publicly
    • Financial data deleted
    • ML model making wrong predictions due to corrupt training data
  • Response Time: Immediate (< 15 minutes)
  • Notification: Page on-call + executives
  • Postmortem: Required within 48 hours

P1 (High)

  • Definition: Pipeline down, critical data corrupt, major quality degradation
  • Examples:
    • Daily ETL failed, no fresh data
    • Revenue reporting showing incorrect numbers
    • Customer-facing dashboard broken
  • Response Time: < 1 hour
  • Notification: Page on-call data team
  • Postmortem: Required within 1 week

P2 (Medium)

  • Definition: Data quality issue affecting internal reports
  • Examples:
    • 10% of records have validation errors
    • Non-critical dashboard stale
    • Schema drift detected but not breaking
  • Response Time: < 4 hours
  • Notification: Slack alert to data team
  • Postmortem: Optional

P3 (Low)

  • Definition: Minor data inconsistency, no immediate impact
  • Examples:
    • Duplicate records in non-critical table
    • Formatting inconsistency
    • Deprecated field still populated
  • Response Time: < 1 business day
  • Notification: Ticket created
  • Postmortem: Not required

3. Incident Detection

Automated Detection

# Data quality monitoring
def monitor_data_quality():
    """Continuously monitor data quality metrics"""
    
    checks = {
        'null_rate': check_null_rate('orders', 'customer_id', threshold=0.05),
        'duplicate_rate': check_duplicates('orders', 'order_id', threshold=0.01),
        'freshness': check_freshness('orders', 'created_at', max_age_minutes=60),
        'row_count': check_row_count_anomaly('orders', expected_range=(1000, 10000))
    }
    
    for check_name, result in checks.items():
        if not result['passed']:
            trigger_incident(
                severity=result['severity'],
                title=f"Data quality check failed: {check_name}",
                details=result
            )

Pipeline Failure Alerts

# Airflow callback
from airflow.operators.python import PythonOperator

def on_failure_callback(context):
    """Trigger incident on DAG failure"""
    trigger_incident(
        severity='P1',
        title=f"Pipeline failed: {context['dag'].dag_id}",
        details={
            'task': context['task'].task_id,
            'execution_date': context['execution_date'],
            'error': str(context['exception'])
        }
    )

task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    on_failure_callback=on_failure_callback
)

User Reports

User report channels:
- Support tickets
- Slack #data-issues channel
- Email to data-team@company.com
- Dashboard "Report Issue" button

4. Incident Triage

Initial Assessment Questions

  1. What data is affected? (table, time range, row count)
  2. Who is impacted? (internal teams, customers, ML models)
  3. When did it start? (timestamp, duration)
  4. Is it still happening? (ongoing vs. resolved)
  5. What's the business impact? (revenue, compliance, reputation)

Triage Decision Tree

Is data breach or PII exposed?
  YES → P0, page security team immediately
  NO → Continue

Is production decision-making affected?
  YES → P0/P1, page data team
  NO → Continue

Is critical pipeline down?
  YES → P1, page data team
  NO → Continue

Is data quality degraded?
  YES → P2, alert data team
  NO → P3, create ticket

5. Response Procedures

Step 1: Stop the Bleeding

def stop_the_bleeding(incident_type: str):
    """Immediate actions to prevent further damage"""
    
    if incident_type == 'pipeline_failure':
        # Pause downstream pipelines
        pause_dependent_dags()
        
    elif incident_type == 'data_corruption':
        # Stop writes to affected table
        revoke_write_permissions('corrupted_table')
        
    elif incident_type == 'data_breach':
        # Immediately restrict access
        revoke_all_access('sensitive_table')
        notify_security_team()

Step 2: Assess Damage

-- Assess extent of data corruption
SELECT 
    COUNT(*) as total_rows,
    COUNT(*) FILTER (WHERE price < 0) as corrupt_rows,
    MIN(created_at) as first_corrupt_timestamp,
    MAX(created_at) as last_corrupt_timestamp
FROM orders
WHERE created_at > '2024-01-15 10:00:00';

Step 3: Restore from Backup (if needed)

# PostgreSQL point-in-time recovery
pg_restore \
  --dbname=production \
  --table=orders \
  --data-only \
  --clean \
  backup_before_corruption.dump

# Verify restoration
psql -c "SELECT COUNT(*) FROM orders WHERE created_at > '2024-01-15 09:00:00';"

Step 4: Fix Root Cause

# Example: Fix ETL bug that caused corruption
def fixed_transform(df):
    """Corrected transformation logic"""
    # OLD (buggy): df['price'] = df['price'] * 100
    # NEW (fixed): df['price'] = df['price']  # Already in cents
    return df

# Reprocess affected data
reprocess_date_range(
    start_date='2024-01-15',
    end_date='2024-01-16',
    transform_fn=fixed_transform
)

Step 5: Validate Fix

def validate_fix():
    """Verify data is correct after fix"""
    
    # Check row counts match
    assert get_row_count('orders') == expected_count
    
    # Check no corrupt data remains
    corrupt_count = db.execute("""
        SELECT COUNT(*) FROM orders WHERE price < 0
    """).fetchone()[0]
    assert corrupt_count == 0
    
    # Check data quality metrics
    quality_score = run_data_quality_checks('orders')
    assert quality_score > 95

Step 6: Resume Operations

def resume_operations():
    """Resume normal operations"""
    
    # Restore write permissions
    grant_write_permissions('orders')
    
    # Resume downstream pipelines
    resume_dependent_dags()
    
    # Monitor closely for 24 hours
    enable_enhanced_monitoring('orders', duration_hours=24)

6. Data Recovery Strategies

Point-in-Time Recovery (PITR)

-- PostgreSQL: Restore to specific timestamp
SELECT pg_restore_point('before_corruption');

-- Restore database to point before corruption
pg_basebackup --pgdata=/var/lib/postgresql/restore \
  --target-time='2024-01-15 09:55:00'

Replay from Source

def replay_from_kafka(topic: str, start_offset: int, end_offset: int):
    """Replay events from Kafka to rebuild state"""
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest'
    )
    
    # Seek to start offset
    partition = TopicPartition(topic, 0)
    consumer.assign([partition])
    consumer.seek(partition, start_offset)
    
    for message in consumer:
        if message.offset > end_offset:
            break
        
        # Reprocess event
        process_event(message.value)

Manual Correction

-- Identify and fix corrupt records
UPDATE orders
SET price = price / 100  -- Undo the bug that multiplied by 100
WHERE created_at BETWEEN '2024-01-15 10:00:00' AND '2024-01-15 12:00:00'
  AND price > 1000000;  -- Only fix obviously wrong prices

Reprocessing Pipelines

# Airflow: Backfill specific date range
airflow dags backfill \
  --start-date 2024-01-15 \
  --end-date 2024-01-16 \
  --reset-dagruns \
  daily_etl_dag

7. Communication During Data Incidents

Internal Communication Template

🚨 **DATA INCIDENT** - P1

**Affected Data**: orders table
**Impact**: Revenue dashboard showing incorrect numbers
**Started**: 2024-01-15 10:13 UTC
**Status**: Investigating

**What we know**:
- ETL bug multiplied all prices by 100
- Affects orders from 10:00-12:00 UTC (2 hours)
- ~5,000 orders impacted

**What we're doing**:
- Stopped downstream pipelines
- Restoring from backup
- Fixing ETL bug

**Next update**: 30 minutes

**Incident Commander**: @alice
**War Room**: #incident-data-001

Stakeholder Notification

def notify_stakeholders(incident):
    """Notify affected teams"""
    
    affected_teams = identify_affected_teams(incident['table'])
    
    for team in affected_teams:
        send_notification(
            channel=team['slack_channel'],
            message=f"""
            ⚠️ Data incident affecting {incident['table']}
            
            Impact: {incident['impact']}
            ETA for resolution: {incident['eta']}
            
            Please avoid using this data until resolved.
            Updates in #incident-{incident['id']}
            """
        )

8. Common Data Incident Scenarios

Scenario 1: Accidental DELETE

-- Incident: Developer ran DELETE without WHERE clause
DELETE FROM users;  -- ❌ Deleted all users!

-- Response:
-- 1. Immediately stop application writes
-- 2. Restore from most recent backup
-- 3. Replay transactions from WAL (Write-Ahead Log)
-- 4. Implement safeguards (require WHERE clause, read-only by default)

Scenario 2: Bad Data from Upstream

# Incident: Upstream API started sending null customer_ids

# Detection
if df['customer_id'].isna().sum() > len(df) * 0.01:  # > 1% nulls
    raise DataQualityError("Too many null customer_ids")

# Response
# 1. Reject the batch
# 2. Alert upstream team
# 3. Use previous day's data as fallback
# 4. Implement validation before ingestion

Scenario 3: Pipeline Bug Corrupting Data

# Incident: ETL bug converted all timestamps to UTC incorrectly

# Detection
anomaly_count = db.execute("""
    SELECT COUNT(*) FROM events
    WHERE event_time > NOW()  -- Future timestamps = bug
""").fetchone()[0]

# Response
# 1. Identify affected date range
# 2. Pause pipeline
# 3. Fix transformation logic
# 4. Reprocess affected dates
# 5. Add validation for timestamp sanity

Scenario 4: Schema Change Breaking Pipeline

# Incident: Upstream renamed 'user_id' to 'customer_id'

# Detection
try:
    df = spark.read.parquet("s3://data/users/")
    df.select("user_id")  # KeyError
except KeyError:
    trigger_incident("Schema drift detected")

# Response
# 1. Update pipeline to handle both column names
# 2. Coordinate with upstream for future changes
# 3. Implement schema validation before processing

9. Prevention Strategies

Immutable Data Lakes

# Never modify data in place; always append new versions
# S3 versioning enabled
s3.put_bucket_versioning(
    Bucket='data-lake',
    VersioningConfiguration={'Status': 'Enabled'}
)

# Write new partition instead of overwriting
df.write.partitionBy('date').mode('append').parquet('s3://data-lake/orders/')

Strong Data Validation

# Validate before loading
@validate_schema(expected_schema)
@validate_quality(min_quality_score=95)
def load_to_warehouse(df):
    df.write.jdbc(url, table, mode='append')

Backup and Restore Testing

# Monthly backup restore drill
# 1. Restore backup to test environment
pg_restore --dbname=test_db production_backup.dump

# 2. Verify data integrity
python verify_data_integrity.py --db test_db

# 3. Measure restore time
# 4. Document any issues

Schema Change Management

# Data contract with upstream
contract:
  table: users
  schema_changes_require:
    - 2 weeks notice
    - Backward compatibility
    - Coordination meeting
  breaking_changes_forbidden:
    - Column removal
    - Column rename
    - Type change

10. Data Incident Playbooks

Playbook: Data Loss

## Data Loss Incident Response

### Immediate Actions (0-15 min)
- [ ] Confirm scope of data loss (tables, time range, row count)
- [ ] Stop any processes that might overwrite backups
- [ ] Page on-call data engineer + DBA
- [ ] Create war room (#incident-XXX)

### Assessment (15-30 min)
- [ ] Identify last known good backup
- [ ] Estimate recovery time
- [ ] Identify affected downstream systems
- [ ] Notify stakeholders

### Recovery (30 min - X hours)
- [ ] Restore from backup to staging environment
- [ ] Validate restored data
- [ ] Restore to production
- [ ] Verify row counts and data quality
- [ ] Resume dependent pipelines

### Prevention
- [ ] Implement soft deletes
- [ ] Add confirmation prompts for destructive operations
- [ ] Enable database audit logging
- [ ] Schedule backup restore drills

Playbook: Data Corruption

## Data Corruption Incident Response

### Immediate Actions
- [ ] Identify extent of corruption (affected rows, columns, time range)
- [ ] Pause downstream pipelines to prevent propagation
- [ ] Quarantine corrupt data

### Root Cause Analysis
- [ ] Review recent code changes
- [ ] Check for upstream data issues
- [ ] Examine pipeline logs for errors

### Remediation
- [ ] Fix root cause (code bug, config error)
- [ ] Choose recovery method:
  - [ ] Restore from backup
  - [ ] Replay from source
  - [ ] Manual correction
  - [ ] Reprocess pipeline
- [ ] Validate fix with data quality checks

### Prevention
- [ ] Add data quality checks before and after transformation
- [ ] Implement idempotency in pipelines
- [ ] Add integration tests for edge cases

11. Incident Response Checklist

  • Detection: Do we have automated monitoring for data quality?
  • Alerting: Are alerts routed to the right people?
  • Runbooks: Do we have playbooks for common scenarios?
  • Backups: Are backups tested and restore time known?
  • Communication: Do we have templates for stakeholder updates?
  • War Room: Is there a dedicated channel for incidents?
  • Postmortem: Do we conduct blameless postmortems?
  • Prevention: Are action items from postmortems tracked?

Related Skills

  • 41-incident-management/incident-triage
  • 41-incident-management/incident-retrospective
  • 43-data-reliability/data-quality-checks
  • 43-data-reliability/schema-drift
  • 40-system-resilience/disaster-recovery

GitHub Repository

majiayu000/claude-skill-registry
Path: skills/data-incident-response

Related Skills

content-collections

Meta

This skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.

View skill

sglang

Meta

SGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.

View skill

evaluating-llms-harness

Testing

This Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.

View skill

langchain

Meta

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

View skill