Back to Skills

dapr-integration

majiayu000
Updated Today
1 views
58
9
58
View on GitHub
Metaapi

About

This skill helps developers integrate Dapr for pub/sub messaging and scheduled jobs in event-driven architectures. It covers handling CloudEvent formats, setting up subscriptions, and using the Dapr Jobs API while addressing common pitfalls like CloudEvent unwrapping. Use it when implementing Dapr-based microservices in Kubernetes environments.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/majiayu000/claude-skill-registry
Git CloneAlternative
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/dapr-integration

Copy and paste this command in Claude Code to install this skill

Documentation

Dapr Integration

Integrate Dapr sidecar for pub/sub messaging, state management, and scheduled jobs in Kubernetes environments.

When to Use

  • Setting up Dapr pub/sub for event-driven microservices
  • Scheduling jobs with Dapr Jobs API (v1.0-alpha1)
  • Handling CloudEvent message formats
  • Implementing subscription handlers in FastAPI
  • Debugging Dapr integration issues

Quick Start

# Install Dapr CLI
curl -fsSL https://raw.githubusercontent.com/dapr/cli/master/install/install.sh | bash

# Initialize Dapr (local)
dapr init

# Run app with Dapr sidecar
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app

Core Patterns

1. Pub/Sub Subscription Handler (FastAPI)

from fastapi import APIRouter, Request
from sqlmodel.ext.asyncio.session import AsyncSession

router = APIRouter(prefix="/dapr", tags=["Dapr"])

# Topics we subscribe to
SUBSCRIPTIONS = [
    {"pubsubname": "taskflow-pubsub", "topic": "task-events", "route": "/dapr/events/task-events"},
    {"pubsubname": "taskflow-pubsub", "topic": "reminders", "route": "/dapr/events/reminders"},
]

@router.get("/subscribe")
async def get_subscriptions() -> list[dict]:
    """Dapr calls this on startup to discover subscriptions."""
    return SUBSCRIPTIONS

2. CloudEvent Handling (CRITICAL)

Dapr wraps all pub/sub messages in CloudEvent format. You MUST unwrap it.

@router.post("/events/task-events")
async def handle_task_events(
    request: Request,
    session: AsyncSession = Depends(get_session),
) -> dict:
    try:
        # Step 1: Get raw CloudEvent
        raw_event = await request.json()

        # Step 2: ALWAYS unwrap CloudEvent "data" field
        # CloudEvent structure:
        # {
        #   "data": {              <-- Your payload is HERE
        #     "event_type": "task.created",
        #     "data": {...},
        #     "timestamp": "..."
        #   },
        #   "datacontenttype": "application/json",
        #   "id": "...",
        #   "pubsubname": "taskflow-pubsub",
        #   "source": "myapp",
        #   "topic": "task-events",
        #   ...
        # }
        event = raw_event.get("data", raw_event)  # Unwrap or use as-is

        # Step 3: Now access your payload
        event_type = event.get("event_type")  # "task.created"
        data = event.get("data", {})          # Your actual data

        # Process event...

        return {"status": "SUCCESS"}

    except Exception as e:
        logger.exception("Error handling event: %s", e)
        # Return SUCCESS to prevent Dapr retries for bad events
        return {"status": "SUCCESS"}

3. Publishing Events

import httpx

DAPR_HTTP_ENDPOINT = "http://localhost:3500"
PUBSUB_NAME = "taskflow-pubsub"

async def publish_event(
    topic: str,
    event_type: str,
    data: dict,
) -> bool:
    """Publish event to Dapr pub/sub."""
    url = f"{DAPR_HTTP_ENDPOINT}/v1.0/publish/{PUBSUB_NAME}/{topic}"

    payload = {
        "event_type": event_type,
        "data": data,
        "timestamp": datetime.utcnow().isoformat(),
    }

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.post(url, json=payload)
            return response.status_code == 204
    except Exception as e:
        logger.error("Failed to publish event: %s", e)
        return False

4. Dapr Jobs API (Scheduled Jobs)

CRITICAL: Dapr Jobs v1.0-alpha1 calls back to /job/{job_name} by default!

# Scheduling a job
async def schedule_job(
    job_name: str,
    due_time: datetime,
    data: dict,
    dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
    """Schedule a one-time Dapr job."""
    url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"

    payload = {
        "dueTime": due_time.strftime("%Y-%m-%dT%H:%M:%SZ"),  # RFC3339
        "data": data,
    }

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.post(url, json=payload)
            return response.status_code == 204
    except Exception as e:
        logger.error("Failed to schedule job: %s", e)
        return False

Handling the callback - Dapr calls /job/{job_name}, NOT a custom endpoint:

# WRONG - Dapr won't call this!
@router.post("/api/jobs/trigger")
async def handle_trigger(...):
    pass

# CORRECT - This is what Dapr actually calls
@router.post("/job/{job_name}")
async def handle_dapr_job_callback(
    job_name: str,
    request: Request,
    session: AsyncSession = Depends(get_session),
) -> dict:
    """Handle Dapr Jobs v1.0-alpha1 callback.

    Dapr calls POST /job/{job_name} when a scheduled job fires.
    """
    try:
        body = await request.json()
        job_data = body.get("data", body)  # Unwrap if needed

        task_id = job_data.get("task_id")
        job_type = job_data.get("type")

        logger.info("Job callback: job=%s, type=%s", job_name, job_type)

        if job_type == "reminder":
            return await handle_reminder(session, job_data)
        elif job_type == "spawn":
            return await handle_spawn(session, task_id)

        return {"status": "unknown_type"}

    except Exception as e:
        logger.exception("Error handling job %s: %s", job_name, e)
        return {"status": "error"}

5. Deleting Scheduled Jobs

async def delete_job(
    job_name: str,
    dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
    """Cancel a scheduled Dapr job."""
    url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.delete(url)
            # 204 = deleted, 500 = not found (both OK)
            return response.status_code in (204, 500)
    except Exception as e:
        logger.error("Failed to delete job: %s", e)
        return False

Kubernetes/Helm Configuration

Dapr Pub/Sub Component (Redis)

# dapr-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: taskflow-pubsub
  namespace: taskflow
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secret
        key: password
    - name: enableTLS
      value: "true"  # Required for Upstash

Dapr Annotations for Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: taskflow-api
spec:
  template:
    metadata:
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "taskflow-api"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"

Common Pitfalls

1. Not Unwrapping CloudEvent

# WRONG - event_type will be None!
event = await request.json()
event_type = event.get("event_type")  # None - it's nested in "data"

# CORRECT
raw_event = await request.json()
event = raw_event.get("data", raw_event)  # Unwrap CloudEvent
event_type = event.get("event_type")  # "task.created"

2. Wrong Job Callback URL

# WRONG - Dapr calls /job/{name}, not custom endpoints
@router.post("/api/jobs/trigger")  # Dapr won't call this!

# CORRECT
@router.post("/job/{job_name}")  # Dapr WILL call this

3. Forgetting to Return SUCCESS

# WRONG - Dapr will retry on errors
@router.post("/events/task-events")
async def handle(request: Request):
    try:
        # process...
        return {"status": "SUCCESS"}
    except Exception:
        raise  # Dapr will retry!

# CORRECT - Always return SUCCESS to stop retries
@router.post("/events/task-events")
async def handle(request: Request):
    try:
        # process...
    except Exception as e:
        logger.exception("Error: %s", e)
    return {"status": "SUCCESS"}  # Always, even on error

4. Using Wrong Dapr HTTP Port

# Local development
DAPR_HTTP_ENDPOINT = "http://localhost:3500"

# In Kubernetes (sidecar)
DAPR_HTTP_ENDPOINT = "http://localhost:3500"  # Same! Sidecar is localhost

Debugging

Check Dapr Sidecar Logs

# Kubernetes
kubectl logs deploy/myapp -c daprd -n mynamespace

# Look for:
# - "Scheduler stream connected" = Jobs API working
# - "HTTP API Called" = API calls to Dapr

Verify Subscriptions

# Call your subscribe endpoint
curl http://localhost:8000/dapr/subscribe

# Should return your subscriptions list

Test Pub/Sub Locally

# Publish test event
curl -X POST http://localhost:3500/v1.0/publish/taskflow-pubsub/task-events \
  -H "Content-Type: application/json" \
  -d '{"event_type": "test", "data": {}}'

References

GitHub Repository

majiayu000/claude-skill-registry
Path: skills/dapr-integration

Related Skills

creating-opencode-plugins

Meta

This skill provides the structure and API specifications for creating OpenCode plugins that hook into 25+ event types like commands, files, and LSP operations. It offers implementation patterns for JavaScript/TypeScript modules that intercept and extend the AI assistant's lifecycle. Use it when you need to build event-driven plugins for monitoring, custom handling, or extending OpenCode's capabilities.

View skill

evaluating-llms-harness

Testing

This Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.

View skill

langchain

Meta

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

View skill

cloudflare-turnstile

Meta

This skill provides comprehensive guidance for implementing Cloudflare Turnstile as a CAPTCHA-alternative bot protection system. It covers integration for forms, login pages, API endpoints, and frameworks like React/Next.js/Hono, while handling invisible challenges that maintain user experience. Use it when migrating from reCAPTCHA, debugging error codes, or implementing token validation and E2E tests.

View skill