Data Incident Response
About
This skill provides structured playbooks for responding to data incidents like pipeline failures, data corruption, or loss. It helps developers quickly triage, contain, and resolve issues to minimize impact on analytics and business decisions. Key features include severity classification, clear response procedures, and a focus on prevention.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/Data Incident ResponseCopy and paste this command in Claude Code to install this skill
Documentation
Data Incident Response
Overview
Data Incident Response is the process of detecting, triaging, and resolving issues related to data quality, availability, or integrity. Unlike application incidents, data incidents can have long-lasting impacts on analytics, ML models, and business decisions.
Core Principle: "Bad data is worse than no data. Detect fast, respond faster, prevent recurrence."
1. Types of Data Incidents
| Type | Description | Example | Severity |
|---|---|---|---|
| Data Loss | Data deleted or not captured | Accidental DROP TABLE | P0 |
| Data Corruption | Data modified incorrectly | ETL bug multiplies prices by 100 | P0-P1 |
| Data Breach | Unauthorized data access | PII exposed in logs | P0 |
| Pipeline Failure | ETL/ELT pipeline stops | Airflow DAG fails | P1-P2 |
| Schema Breaking Change | Upstream schema change breaks pipeline | Column renamed without notice | P1 |
| Data Quality Degradation | Increasing nulls, duplicates, anomalies | 20% of orders have null customer_id | P2 |
| Freshness Violation | Data not updated within SLA | Dashboard showing yesterday's data | P2-P3 |
2. Data Incident Severity Levels
P0 (Critical)
- Definition: Data breach, major data loss, or corruption affecting production decisions
- Examples:
- PII exposed publicly
- Financial data deleted
- ML model making wrong predictions due to corrupt training data
- Response Time: Immediate (< 15 minutes)
- Notification: Page on-call + executives
- Postmortem: Required within 48 hours
P1 (High)
- Definition: Pipeline down, critical data corrupt, major quality degradation
- Examples:
- Daily ETL failed, no fresh data
- Revenue reporting showing incorrect numbers
- Customer-facing dashboard broken
- Response Time: < 1 hour
- Notification: Page on-call data team
- Postmortem: Required within 1 week
P2 (Medium)
- Definition: Data quality issue affecting internal reports
- Examples:
- 10% of records have validation errors
- Non-critical dashboard stale
- Schema drift detected but not breaking
- Response Time: < 4 hours
- Notification: Slack alert to data team
- Postmortem: Optional
P3 (Low)
- Definition: Minor data inconsistency, no immediate impact
- Examples:
- Duplicate records in non-critical table
- Formatting inconsistency
- Deprecated field still populated
- Response Time: < 1 business day
- Notification: Ticket created
- Postmortem: Not required
3. Incident Detection
Automated Detection
# Data quality monitoring
def monitor_data_quality():
"""Continuously monitor data quality metrics"""
checks = {
'null_rate': check_null_rate('orders', 'customer_id', threshold=0.05),
'duplicate_rate': check_duplicates('orders', 'order_id', threshold=0.01),
'freshness': check_freshness('orders', 'created_at', max_age_minutes=60),
'row_count': check_row_count_anomaly('orders', expected_range=(1000, 10000))
}
for check_name, result in checks.items():
if not result['passed']:
trigger_incident(
severity=result['severity'],
title=f"Data quality check failed: {check_name}",
details=result
)
Pipeline Failure Alerts
# Airflow callback
from airflow.operators.python import PythonOperator
def on_failure_callback(context):
"""Trigger incident on DAG failure"""
trigger_incident(
severity='P1',
title=f"Pipeline failed: {context['dag'].dag_id}",
details={
'task': context['task'].task_id,
'execution_date': context['execution_date'],
'error': str(context['exception'])
}
)
task = PythonOperator(
task_id='process_data',
python_callable=process_data,
on_failure_callback=on_failure_callback
)
User Reports
User report channels:
- Support tickets
- Slack #data-issues channel
- Email to data-team@company.com
- Dashboard "Report Issue" button
4. Incident Triage
Initial Assessment Questions
- What data is affected? (table, time range, row count)
- Who is impacted? (internal teams, customers, ML models)
- When did it start? (timestamp, duration)
- Is it still happening? (ongoing vs. resolved)
- What's the business impact? (revenue, compliance, reputation)
Triage Decision Tree
Is data breach or PII exposed?
YES → P0, page security team immediately
NO → Continue
Is production decision-making affected?
YES → P0/P1, page data team
NO → Continue
Is critical pipeline down?
YES → P1, page data team
NO → Continue
Is data quality degraded?
YES → P2, alert data team
NO → P3, create ticket
5. Response Procedures
Step 1: Stop the Bleeding
def stop_the_bleeding(incident_type: str):
"""Immediate actions to prevent further damage"""
if incident_type == 'pipeline_failure':
# Pause downstream pipelines
pause_dependent_dags()
elif incident_type == 'data_corruption':
# Stop writes to affected table
revoke_write_permissions('corrupted_table')
elif incident_type == 'data_breach':
# Immediately restrict access
revoke_all_access('sensitive_table')
notify_security_team()
Step 2: Assess Damage
-- Assess extent of data corruption
SELECT
COUNT(*) as total_rows,
COUNT(*) FILTER (WHERE price < 0) as corrupt_rows,
MIN(created_at) as first_corrupt_timestamp,
MAX(created_at) as last_corrupt_timestamp
FROM orders
WHERE created_at > '2024-01-15 10:00:00';
Step 3: Restore from Backup (if needed)
# PostgreSQL point-in-time recovery
pg_restore \
--dbname=production \
--table=orders \
--data-only \
--clean \
backup_before_corruption.dump
# Verify restoration
psql -c "SELECT COUNT(*) FROM orders WHERE created_at > '2024-01-15 09:00:00';"
Step 4: Fix Root Cause
# Example: Fix ETL bug that caused corruption
def fixed_transform(df):
"""Corrected transformation logic"""
# OLD (buggy): df['price'] = df['price'] * 100
# NEW (fixed): df['price'] = df['price'] # Already in cents
return df
# Reprocess affected data
reprocess_date_range(
start_date='2024-01-15',
end_date='2024-01-16',
transform_fn=fixed_transform
)
Step 5: Validate Fix
def validate_fix():
"""Verify data is correct after fix"""
# Check row counts match
assert get_row_count('orders') == expected_count
# Check no corrupt data remains
corrupt_count = db.execute("""
SELECT COUNT(*) FROM orders WHERE price < 0
""").fetchone()[0]
assert corrupt_count == 0
# Check data quality metrics
quality_score = run_data_quality_checks('orders')
assert quality_score > 95
Step 6: Resume Operations
def resume_operations():
"""Resume normal operations"""
# Restore write permissions
grant_write_permissions('orders')
# Resume downstream pipelines
resume_dependent_dags()
# Monitor closely for 24 hours
enable_enhanced_monitoring('orders', duration_hours=24)
6. Data Recovery Strategies
Point-in-Time Recovery (PITR)
-- PostgreSQL: Restore to specific timestamp
SELECT pg_restore_point('before_corruption');
-- Restore database to point before corruption
pg_basebackup --pgdata=/var/lib/postgresql/restore \
--target-time='2024-01-15 09:55:00'
Replay from Source
def replay_from_kafka(topic: str, start_offset: int, end_offset: int):
"""Replay events from Kafka to rebuild state"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest'
)
# Seek to start offset
partition = TopicPartition(topic, 0)
consumer.assign([partition])
consumer.seek(partition, start_offset)
for message in consumer:
if message.offset > end_offset:
break
# Reprocess event
process_event(message.value)
Manual Correction
-- Identify and fix corrupt records
UPDATE orders
SET price = price / 100 -- Undo the bug that multiplied by 100
WHERE created_at BETWEEN '2024-01-15 10:00:00' AND '2024-01-15 12:00:00'
AND price > 1000000; -- Only fix obviously wrong prices
Reprocessing Pipelines
# Airflow: Backfill specific date range
airflow dags backfill \
--start-date 2024-01-15 \
--end-date 2024-01-16 \
--reset-dagruns \
daily_etl_dag
7. Communication During Data Incidents
Internal Communication Template
🚨 **DATA INCIDENT** - P1
**Affected Data**: orders table
**Impact**: Revenue dashboard showing incorrect numbers
**Started**: 2024-01-15 10:13 UTC
**Status**: Investigating
**What we know**:
- ETL bug multiplied all prices by 100
- Affects orders from 10:00-12:00 UTC (2 hours)
- ~5,000 orders impacted
**What we're doing**:
- Stopped downstream pipelines
- Restoring from backup
- Fixing ETL bug
**Next update**: 30 minutes
**Incident Commander**: @alice
**War Room**: #incident-data-001
Stakeholder Notification
def notify_stakeholders(incident):
"""Notify affected teams"""
affected_teams = identify_affected_teams(incident['table'])
for team in affected_teams:
send_notification(
channel=team['slack_channel'],
message=f"""
⚠️ Data incident affecting {incident['table']}
Impact: {incident['impact']}
ETA for resolution: {incident['eta']}
Please avoid using this data until resolved.
Updates in #incident-{incident['id']}
"""
)
8. Common Data Incident Scenarios
Scenario 1: Accidental DELETE
-- Incident: Developer ran DELETE without WHERE clause
DELETE FROM users; -- ❌ Deleted all users!
-- Response:
-- 1. Immediately stop application writes
-- 2. Restore from most recent backup
-- 3. Replay transactions from WAL (Write-Ahead Log)
-- 4. Implement safeguards (require WHERE clause, read-only by default)
Scenario 2: Bad Data from Upstream
# Incident: Upstream API started sending null customer_ids
# Detection
if df['customer_id'].isna().sum() > len(df) * 0.01: # > 1% nulls
raise DataQualityError("Too many null customer_ids")
# Response
# 1. Reject the batch
# 2. Alert upstream team
# 3. Use previous day's data as fallback
# 4. Implement validation before ingestion
Scenario 3: Pipeline Bug Corrupting Data
# Incident: ETL bug converted all timestamps to UTC incorrectly
# Detection
anomaly_count = db.execute("""
SELECT COUNT(*) FROM events
WHERE event_time > NOW() -- Future timestamps = bug
""").fetchone()[0]
# Response
# 1. Identify affected date range
# 2. Pause pipeline
# 3. Fix transformation logic
# 4. Reprocess affected dates
# 5. Add validation for timestamp sanity
Scenario 4: Schema Change Breaking Pipeline
# Incident: Upstream renamed 'user_id' to 'customer_id'
# Detection
try:
df = spark.read.parquet("s3://data/users/")
df.select("user_id") # KeyError
except KeyError:
trigger_incident("Schema drift detected")
# Response
# 1. Update pipeline to handle both column names
# 2. Coordinate with upstream for future changes
# 3. Implement schema validation before processing
9. Prevention Strategies
Immutable Data Lakes
# Never modify data in place; always append new versions
# S3 versioning enabled
s3.put_bucket_versioning(
Bucket='data-lake',
VersioningConfiguration={'Status': 'Enabled'}
)
# Write new partition instead of overwriting
df.write.partitionBy('date').mode('append').parquet('s3://data-lake/orders/')
Strong Data Validation
# Validate before loading
@validate_schema(expected_schema)
@validate_quality(min_quality_score=95)
def load_to_warehouse(df):
df.write.jdbc(url, table, mode='append')
Backup and Restore Testing
# Monthly backup restore drill
# 1. Restore backup to test environment
pg_restore --dbname=test_db production_backup.dump
# 2. Verify data integrity
python verify_data_integrity.py --db test_db
# 3. Measure restore time
# 4. Document any issues
Schema Change Management
# Data contract with upstream
contract:
table: users
schema_changes_require:
- 2 weeks notice
- Backward compatibility
- Coordination meeting
breaking_changes_forbidden:
- Column removal
- Column rename
- Type change
10. Data Incident Playbooks
Playbook: Data Loss
## Data Loss Incident Response
### Immediate Actions (0-15 min)
- [ ] Confirm scope of data loss (tables, time range, row count)
- [ ] Stop any processes that might overwrite backups
- [ ] Page on-call data engineer + DBA
- [ ] Create war room (#incident-XXX)
### Assessment (15-30 min)
- [ ] Identify last known good backup
- [ ] Estimate recovery time
- [ ] Identify affected downstream systems
- [ ] Notify stakeholders
### Recovery (30 min - X hours)
- [ ] Restore from backup to staging environment
- [ ] Validate restored data
- [ ] Restore to production
- [ ] Verify row counts and data quality
- [ ] Resume dependent pipelines
### Prevention
- [ ] Implement soft deletes
- [ ] Add confirmation prompts for destructive operations
- [ ] Enable database audit logging
- [ ] Schedule backup restore drills
Playbook: Data Corruption
## Data Corruption Incident Response
### Immediate Actions
- [ ] Identify extent of corruption (affected rows, columns, time range)
- [ ] Pause downstream pipelines to prevent propagation
- [ ] Quarantine corrupt data
### Root Cause Analysis
- [ ] Review recent code changes
- [ ] Check for upstream data issues
- [ ] Examine pipeline logs for errors
### Remediation
- [ ] Fix root cause (code bug, config error)
- [ ] Choose recovery method:
- [ ] Restore from backup
- [ ] Replay from source
- [ ] Manual correction
- [ ] Reprocess pipeline
- [ ] Validate fix with data quality checks
### Prevention
- [ ] Add data quality checks before and after transformation
- [ ] Implement idempotency in pipelines
- [ ] Add integration tests for edge cases
11. Incident Response Checklist
- Detection: Do we have automated monitoring for data quality?
- Alerting: Are alerts routed to the right people?
- Runbooks: Do we have playbooks for common scenarios?
- Backups: Are backups tested and restore time known?
- Communication: Do we have templates for stakeholder updates?
- War Room: Is there a dedicated channel for incidents?
- Postmortem: Do we conduct blameless postmortems?
- Prevention: Are action items from postmortems tracked?
Related Skills
41-incident-management/incident-triage41-incident-management/incident-retrospective43-data-reliability/data-quality-checks43-data-reliability/schema-drift40-system-resilience/disaster-recovery
GitHub Repository
Related Skills
content-collections
MetaThis skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.
sglang
MetaSGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
