Back to Skills

Concurrency

majiayu000
Updated Yesterday
58
9
58
View on GitHub
Metageneral

About

This skill explains Effect's fiber-based concurrency model and its primitives for parallel execution. Use it when developers ask about fibers, forking, or concurrent operations like `Effect.all` and `Effect.race`. It covers structured concurrency, safe interruption, and tools like Queue, Semaphore, and Deferred.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/majiayu000/claude-skill-registry
Git CloneAlternative
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/Concurrency

Copy and paste this command in Claude Code to install this skill

Documentation

Concurrency in Effect

Overview

Effect provides lightweight fiber-based concurrency:

  • Fibers - Lightweight threads managed by Effect runtime
  • Structured concurrency - Parent fibers supervise children
  • Safe interruption - Clean cancellation with resource cleanup
  • Concurrent primitives - Queue, Deferred, Semaphore, PubSub

Basic Parallel Execution

Effect.all with Concurrency

import { Effect } from "effect"

const results = yield* Effect.all(
  [fetchUser(1), fetchUser(2), fetchUser(3)],
  { concurrency: "unbounded" }
)

const results = yield* Effect.all(tasks, { concurrency: 5 })

const results = yield* Effect.all(tasks)

Effect.forEach with Concurrency

const users = yield* Effect.forEach(
  userIds,
  (id) => fetchUser(id),
  { concurrency: 10 }
)

Fibers

Creating Fibers with fork

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(longRunningTask)

  yield* doOtherWork()

  const result = yield* Fiber.join(fiber)
})

Fork Variants

const fiber = yield* Effect.fork(task)

const fiber = yield* Effect.forkDaemon(task)

const fiber = yield* Effect.forkIn(scope)(task)

const fiber = yield* Effect.forkWithErrorHandler(task, onError)

Fiber Operations

import { Fiber } from "effect"

const result = yield* Fiber.join(fiber)

const exit = yield* Fiber.await(fiber)

yield* Fiber.interrupt(fiber)

const maybeResult = yield* Fiber.poll(fiber)

Racing

Effect.race - First to Complete

const fastest = yield* Effect.race(
  fetchFromServer1(),
  fetchFromServer2()
)

Effect.raceAll - Race Many

const fastest = yield* Effect.raceAll([
  fetchFromCDN1(),
  fetchFromCDN2(),
  fetchFromCDN3()
])

Effect.raceFirst - Include Failures

const first = yield* Effect.raceFirst(task1, task2)

Deferred - One-Time Promise

import { Deferred } from "effect"

const program = Effect.gen(function* () {
  const deferred = yield* Deferred.make<string, never>()

  const fiber = yield* Effect.fork(
    Effect.gen(function* () {
      const value = yield* Deferred.await(deferred)
      yield* Effect.log(`Got: ${value}`)
    })
  )

  yield* Deferred.succeed(deferred, "Hello!")

  yield* Fiber.join(fiber)
})

Queue - Concurrent Queue

import { Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(100)

  yield* Effect.fork(
    Effect.forEach(
      [1, 2, 3, 4, 5],
      (n) => Queue.offer(queue, n)
    )
  )

  const items = yield* Effect.forEach(
    Array.from({ length: 5 }),
    () => Queue.take(queue)
  )
})

Queue Variants

const bounded = yield* Queue.bounded<number>(100)

const unbounded = yield* Queue.unbounded<number>()

const dropping = yield* Queue.dropping<number>(100)

const sliding = yield* Queue.sliding<number>(100)

PubSub - Publish/Subscribe

import { PubSub } from "effect"

const program = Effect.gen(function* () {
  const pubsub = yield* PubSub.bounded<string>(100)

  const sub1 = yield* PubSub.subscribe(pubsub)
  const sub2 = yield* PubSub.subscribe(pubsub)

  yield* PubSub.publish(pubsub, "Hello!")

  const msg1 = yield* Queue.take(sub1)
  const msg2 = yield* Queue.take(sub2)
})

Semaphore - Limit Concurrency

import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(3)

  yield* Effect.forEach(
    tasks,
    (task) => semaphore.withPermits(1)(task),
    { concurrency: "unbounded" }
  )
})

Latch - Coordination Point

import { Latch } from "effect"

const program = Effect.gen(function* () {
  const latch = yield* Latch.make(false)

  yield* Effect.fork(
    Effect.forEach(
      workers,
      (worker) =>
        Effect.gen(function* () {
          yield* Latch.await(latch)
          yield* worker.start()
        }),
      { concurrency: "unbounded" }
    )
  )

  yield* Latch.open(latch)
})

Interruption

Interrupting Fibers

const fiber = yield* Effect.fork(longTask)

yield* Fiber.interrupt(fiber)

Uninterruptible Regions

const critical = Effect.uninterruptible(
  Effect.gen(function* () {
    yield* beginTransaction()
    yield* performOperations()
    yield* commitTransaction()
  })
)

Interruptible Within Uninterruptible

const program = Effect.uninterruptible(
  Effect.gen(function* () {
    yield* criticalSetup()

    // This part can be interrupted
    yield* Effect.interruptible(longOperation)

    yield* criticalTeardown()
  })
)

Supervision

Structured concurrency ensures child fibers are managed:

const parent = Effect.gen(function* () {
  const child1 = yield* Effect.fork(task1)
  const child2 = yield* Effect.fork(task2)

  // If parent fails/interrupts, children are interrupted
  yield* failingOperation()
})
// child1 and child2 automatically interrupted

Daemon Fibers

Escape supervision with daemon:

const daemon = yield* Effect.forkDaemon(backgroundTask)

Common Patterns

Timeout with Fallback

const withTimeout = task.pipe(
  Effect.timeout("5 seconds"),
  Effect.map(Option.getOrElse(() => defaultValue))
)

Worker Pool

const workerPool = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(numWorkers)

  return (task: Effect.Effect<A>) =>
    semaphore.withPermits(1)(task)
})

Parallel with Error Collection

const results = yield* Effect.all(
  tasks,
  {
    concurrency: "unbounded",
    mode: "either" // Collect all results
  }
)

Best Practices

  1. Use Effect.all concurrency for simple parallelism
  2. Use Semaphore to limit concurrent operations
  3. Prefer structured concurrency over daemon fibers
  4. Handle interruption in long-running effects
  5. Use Queue for producer/consumer patterns
  6. Use Deferred for one-time coordination

Additional Resources

For comprehensive concurrency documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Fibers" for fiber management
  • "Basic Concurrency" for parallel execution
  • "Deferred" for synchronization primitives
  • "Queue" for concurrent queues
  • "PubSub" for publish/subscribe
  • "Semaphore" for concurrency limiting

GitHub Repository

majiayu000/claude-skill-registry
Path: skills/concurrency

Related Skills

algorithmic-art

Meta

This Claude Skill creates original algorithmic art using p5.js with seeded randomness and interactive parameters. It generates .md files for algorithmic philosophies, plus .html and .js files for interactive generative art implementations. Use it when developers need to create flow fields, particle systems, or other computational art while avoiding copyright issues.

View skill

subagent-driven-development

Development

This skill executes implementation plans by dispatching a fresh subagent for each independent task, with code review between tasks. It enables fast iteration while maintaining quality gates through this review process. Use it when working on mostly independent tasks within the same session to ensure continuous progress with built-in quality checks.

View skill

executing-plans

Design

Use the executing-plans skill when you have a complete implementation plan to execute in controlled batches with review checkpoints. It loads and critically reviews the plan, then executes tasks in small batches (default 3 tasks) while reporting progress between each batch for architect review. This ensures systematic implementation with built-in quality control checkpoints.

View skill

cost-optimization

Other

This Claude Skill helps developers optimize cloud costs through resource rightsizing, tagging strategies, and spending analysis. It provides a framework for reducing cloud expenses and implementing cost governance across AWS, Azure, and GCP. Use it when you need to analyze infrastructure costs, right-size resources, or meet budget constraints.

View skill