Back to Skills

configuring-dapr-pubsub

majiayu000
Updated Yesterday
1 views
58
9
58
View on GitHub
Documentationgeneral

About

This skill configures Dapr pub/sub components for event-driven microservices using Kafka or Redis backends. It helps developers wire agent-to-agent communication, set up event subscriptions, and integrate Dapr sidecars with proper component configuration and Kubernetes deployment. Use it when implementing Dapr-based messaging patterns, but not for direct Kafka clients or non-Dapr messaging approaches.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/majiayu000/claude-skill-registry
Git CloneAlternative
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/configuring-dapr-pubsub

Copy and paste this command in Claude Code to install this skill

Documentation

Configuring Dapr Pub/Sub

Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.

Quick Start

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml

# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

Component Configurations

Kafka (Production)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # Required
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # Consumer settings
    - name: consumerGroup
      value: "{namespace}-{appId}"  # Templated per deployment
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # Performance
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

Kafka with SASL Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis (Development/Simple)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

Subscription Patterns

Declarative Subscription (Recommended)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

Programmatic Subscription (FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr calls this to discover subscriptions."""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """Handle incoming CloudEvent."""
    event = await request.json()

    # CloudEvent wrapper - data is nested
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # Process event
    print(f"Task created: {task_id}")

    return {"status": "SUCCESS"}

Publishing Events

From FastAPI Service

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """Publish event through Dapr sidecar."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# Usage
await publish_event("task-created", {
    "task_id": "123",
    "title": "Learn Python",
    "user_id": "user-456"
})

With CloudEvent Metadata

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """Publish with explicit CloudEvent fields."""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes Deployment

Component Scoping

Limit component access to specific apps:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

App Deployment with Dapr Sidecar

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

Multi-Agent Routing Pattern

Triage Agent → Specialist Agents

# triage_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.post("/api/question")
async def handle_question(request: Request):
    data = await request.json()
    question = data["question"]

    # Route based on content
    if "python" in question.lower() or "code" in question.lower():
        topic = "concepts-request"
    elif "error" in question.lower() or "bug" in question.lower():
        topic = "debug-request"
    else:
        topic = "concepts-request"  # Default

    # Publish to appropriate agent
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json={
                "question": question,
                "user_id": data["user_id"],
                "session_id": data["session_id"]
            }
        )

    return {"status": "routed", "topic": topic}

Specialist Agent Handler

# concepts_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.get("/dapr/subscribe")
async def subscribe():
    return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]

@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
    event = await request.json()
    data = event.get("data", event)

    # Process with LLM
    response = await process_with_llm(data["question"])

    # Publish response
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
            json={
                "session_id": data["session_id"],
                "response": response,
                "agent": "concepts"
            }
        )

    return {"status": "SUCCESS"}

Local Development

Run with Dapr CLI

# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
  --resources-path ./components -- uvicorn concepts:app --port 8001

# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
  --resources-path ./components -- uvicorn triage:app --port 8000

Docker Compose with Dapr

version: "3.8"
services:
  triage-agent:
    build: ./services/triage
    ports:
      - "8000:8000"

  triage-agent-dapr:
    image: daprio/daprd:latest
    command: ["./daprd",
      "--app-id", "triage-agent",
      "--app-port", "8000",
      "--dapr-http-port", "3500",
      "--resources-path", "/components"
    ]
    volumes:
      - ./components:/components
    network_mode: "service:triage-agent"
    depends_on:
      - triage-agent

  kafka:
    image: confluentinc/cp-kafka:latest
    # ... kafka config

Troubleshooting

Check Dapr Sidecar

# View sidecar logs
kubectl logs deploy/triage-agent -c daprd

# Check component registration
curl http://localhost:3500/v1.0/metadata

Common Issues

ErrorCauseFix
component not foundComponent not loadedCheck --resources-path or K8s namespace
connection refusedKafka not reachableVerify broker address in component
consumer group rebalanceMultiple instancesUse unique consumerGroup per app
event not receivedWrong topic/routeCheck subscription config

Debug Event Flow

# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'

# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe

Verification

Run: python scripts/verify.py

Related Skills

  • deploying-kafka-k8s - Kafka cluster setup with Strimzi
  • scaffolding-fastapi-dapr - FastAPI services with Dapr
  • scaffolding-openai-agents - Agent orchestration patterns

GitHub Repository

majiayu000/claude-skill-registry
Path: skills/configuring-dapr-pubsub

Related Skills

algorithmic-art

Meta

This Claude Skill creates original algorithmic art using p5.js with seeded randomness and interactive parameters. It generates .md files for algorithmic philosophies, plus .html and .js files for interactive generative art implementations. Use it when developers need to create flow fields, particle systems, or other computational art while avoiding copyright issues.

View skill

subagent-driven-development

Development

This skill executes implementation plans by dispatching a fresh subagent for each independent task, with code review between tasks. It enables fast iteration while maintaining quality gates through this review process. Use it when working on mostly independent tasks within the same session to ensure continuous progress with built-in quality checks.

View skill

executing-plans

Design

Use the executing-plans skill when you have a complete implementation plan to execute in controlled batches with review checkpoints. It loads and critically reviews the plan, then executes tasks in small batches (default 3 tasks) while reporting progress between each batch for architect review. This ensures systematic implementation with built-in quality control checkpoints.

View skill

cost-optimization

Other

This Claude Skill helps developers optimize cloud costs through resource rightsizing, tagging strategies, and spending analysis. It provides a framework for reducing cloud expenses and implementing cost governance across AWS, Azure, and GCP. Use it when you need to analyze infrastructure costs, right-size resources, or meet budget constraints.

View skill