dapr-integration
About
This skill helps developers integrate Dapr for pub/sub messaging and scheduled jobs in event-driven architectures. It covers handling CloudEvent formats, setting up subscriptions, and using the Dapr Jobs API while addressing common pitfalls like CloudEvent unwrapping. Use it when implementing Dapr-based microservices in Kubernetes environments.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/dapr-integrationCopy and paste this command in Claude Code to install this skill
Documentation
Dapr Integration
Integrate Dapr sidecar for pub/sub messaging, state management, and scheduled jobs in Kubernetes environments.
When to Use
- Setting up Dapr pub/sub for event-driven microservices
- Scheduling jobs with Dapr Jobs API (v1.0-alpha1)
- Handling CloudEvent message formats
- Implementing subscription handlers in FastAPI
- Debugging Dapr integration issues
Quick Start
# Install Dapr CLI
curl -fsSL https://raw.githubusercontent.com/dapr/cli/master/install/install.sh | bash
# Initialize Dapr (local)
dapr init
# Run app with Dapr sidecar
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app
Core Patterns
1. Pub/Sub Subscription Handler (FastAPI)
from fastapi import APIRouter, Request
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/dapr", tags=["Dapr"])
# Topics we subscribe to
SUBSCRIPTIONS = [
{"pubsubname": "taskflow-pubsub", "topic": "task-events", "route": "/dapr/events/task-events"},
{"pubsubname": "taskflow-pubsub", "topic": "reminders", "route": "/dapr/events/reminders"},
]
@router.get("/subscribe")
async def get_subscriptions() -> list[dict]:
"""Dapr calls this on startup to discover subscriptions."""
return SUBSCRIPTIONS
2. CloudEvent Handling (CRITICAL)
Dapr wraps all pub/sub messages in CloudEvent format. You MUST unwrap it.
@router.post("/events/task-events")
async def handle_task_events(
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict:
try:
# Step 1: Get raw CloudEvent
raw_event = await request.json()
# Step 2: ALWAYS unwrap CloudEvent "data" field
# CloudEvent structure:
# {
# "data": { <-- Your payload is HERE
# "event_type": "task.created",
# "data": {...},
# "timestamp": "..."
# },
# "datacontenttype": "application/json",
# "id": "...",
# "pubsubname": "taskflow-pubsub",
# "source": "myapp",
# "topic": "task-events",
# ...
# }
event = raw_event.get("data", raw_event) # Unwrap or use as-is
# Step 3: Now access your payload
event_type = event.get("event_type") # "task.created"
data = event.get("data", {}) # Your actual data
# Process event...
return {"status": "SUCCESS"}
except Exception as e:
logger.exception("Error handling event: %s", e)
# Return SUCCESS to prevent Dapr retries for bad events
return {"status": "SUCCESS"}
3. Publishing Events
import httpx
DAPR_HTTP_ENDPOINT = "http://localhost:3500"
PUBSUB_NAME = "taskflow-pubsub"
async def publish_event(
topic: str,
event_type: str,
data: dict,
) -> bool:
"""Publish event to Dapr pub/sub."""
url = f"{DAPR_HTTP_ENDPOINT}/v1.0/publish/{PUBSUB_NAME}/{topic}"
payload = {
"event_type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(url, json=payload)
return response.status_code == 204
except Exception as e:
logger.error("Failed to publish event: %s", e)
return False
4. Dapr Jobs API (Scheduled Jobs)
CRITICAL: Dapr Jobs v1.0-alpha1 calls back to /job/{job_name} by default!
# Scheduling a job
async def schedule_job(
job_name: str,
due_time: datetime,
data: dict,
dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
"""Schedule a one-time Dapr job."""
url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"
payload = {
"dueTime": due_time.strftime("%Y-%m-%dT%H:%M:%SZ"), # RFC3339
"data": data,
}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(url, json=payload)
return response.status_code == 204
except Exception as e:
logger.error("Failed to schedule job: %s", e)
return False
Handling the callback - Dapr calls /job/{job_name}, NOT a custom endpoint:
# WRONG - Dapr won't call this!
@router.post("/api/jobs/trigger")
async def handle_trigger(...):
pass
# CORRECT - This is what Dapr actually calls
@router.post("/job/{job_name}")
async def handle_dapr_job_callback(
job_name: str,
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict:
"""Handle Dapr Jobs v1.0-alpha1 callback.
Dapr calls POST /job/{job_name} when a scheduled job fires.
"""
try:
body = await request.json()
job_data = body.get("data", body) # Unwrap if needed
task_id = job_data.get("task_id")
job_type = job_data.get("type")
logger.info("Job callback: job=%s, type=%s", job_name, job_type)
if job_type == "reminder":
return await handle_reminder(session, job_data)
elif job_type == "spawn":
return await handle_spawn(session, task_id)
return {"status": "unknown_type"}
except Exception as e:
logger.exception("Error handling job %s: %s", job_name, e)
return {"status": "error"}
5. Deleting Scheduled Jobs
async def delete_job(
job_name: str,
dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
"""Cancel a scheduled Dapr job."""
url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.delete(url)
# 204 = deleted, 500 = not found (both OK)
return response.status_code in (204, 500)
except Exception as e:
logger.error("Failed to delete job: %s", e)
return False
Kubernetes/Helm Configuration
Dapr Pub/Sub Component (Redis)
# dapr-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: taskflow-pubsub
namespace: taskflow
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis:6379"
- name: redisPassword
secretKeyRef:
name: redis-secret
key: password
- name: enableTLS
value: "true" # Required for Upstash
Dapr Annotations for Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskflow-api
spec:
template:
metadata:
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "taskflow-api"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
Common Pitfalls
1. Not Unwrapping CloudEvent
# WRONG - event_type will be None!
event = await request.json()
event_type = event.get("event_type") # None - it's nested in "data"
# CORRECT
raw_event = await request.json()
event = raw_event.get("data", raw_event) # Unwrap CloudEvent
event_type = event.get("event_type") # "task.created"
2. Wrong Job Callback URL
# WRONG - Dapr calls /job/{name}, not custom endpoints
@router.post("/api/jobs/trigger") # Dapr won't call this!
# CORRECT
@router.post("/job/{job_name}") # Dapr WILL call this
3. Forgetting to Return SUCCESS
# WRONG - Dapr will retry on errors
@router.post("/events/task-events")
async def handle(request: Request):
try:
# process...
return {"status": "SUCCESS"}
except Exception:
raise # Dapr will retry!
# CORRECT - Always return SUCCESS to stop retries
@router.post("/events/task-events")
async def handle(request: Request):
try:
# process...
except Exception as e:
logger.exception("Error: %s", e)
return {"status": "SUCCESS"} # Always, even on error
4. Using Wrong Dapr HTTP Port
# Local development
DAPR_HTTP_ENDPOINT = "http://localhost:3500"
# In Kubernetes (sidecar)
DAPR_HTTP_ENDPOINT = "http://localhost:3500" # Same! Sidecar is localhost
Debugging
Check Dapr Sidecar Logs
# Kubernetes
kubectl logs deploy/myapp -c daprd -n mynamespace
# Look for:
# - "Scheduler stream connected" = Jobs API working
# - "HTTP API Called" = API calls to Dapr
Verify Subscriptions
# Call your subscribe endpoint
curl http://localhost:8000/dapr/subscribe
# Should return your subscriptions list
Test Pub/Sub Locally
# Publish test event
curl -X POST http://localhost:3500/v1.0/publish/taskflow-pubsub/task-events \
-H "Content-Type: application/json" \
-d '{"event_type": "test", "data": {}}'
References
GitHub Repository
Related Skills
creating-opencode-plugins
MetaThis skill provides the structure and API specifications for creating OpenCode plugins that hook into 25+ event types like commands, files, and LSP operations. It offers implementation patterns for JavaScript/TypeScript modules that intercept and extend the AI assistant's lifecycle. Use it when you need to build event-driven plugins for monitoring, custom handling, or extending OpenCode's capabilities.
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
cloudflare-turnstile
MetaThis skill provides comprehensive guidance for implementing Cloudflare Turnstile as a CAPTCHA-alternative bot protection system. It covers integration for forms, login pages, API endpoints, and frameworks like React/Next.js/Hono, while handling invisible challenges that maintain user experience. Use it when migrating from reCAPTCHA, debugging error codes, or implementing token validation and E2E tests.
