Concurrency
About
This skill explains Effect's fiber-based concurrency model and its primitives for parallel execution. Use it when developers ask about fibers, forking, or concurrent operations like `Effect.all` and `Effect.race`. It covers structured concurrency, safe interruption, and tools like Queue, Semaphore, and Deferred.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/ConcurrencyCopy and paste this command in Claude Code to install this skill
Documentation
Concurrency in Effect
Overview
Effect provides lightweight fiber-based concurrency:
- Fibers - Lightweight threads managed by Effect runtime
- Structured concurrency - Parent fibers supervise children
- Safe interruption - Clean cancellation with resource cleanup
- Concurrent primitives - Queue, Deferred, Semaphore, PubSub
Basic Parallel Execution
Effect.all with Concurrency
import { Effect } from "effect"
const results = yield* Effect.all(
[fetchUser(1), fetchUser(2), fetchUser(3)],
{ concurrency: "unbounded" }
)
const results = yield* Effect.all(tasks, { concurrency: 5 })
const results = yield* Effect.all(tasks)
Effect.forEach with Concurrency
const users = yield* Effect.forEach(
userIds,
(id) => fetchUser(id),
{ concurrency: 10 }
)
Fibers
Creating Fibers with fork
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
yield* doOtherWork()
const result = yield* Fiber.join(fiber)
})
Fork Variants
const fiber = yield* Effect.fork(task)
const fiber = yield* Effect.forkDaemon(task)
const fiber = yield* Effect.forkIn(scope)(task)
const fiber = yield* Effect.forkWithErrorHandler(task, onError)
Fiber Operations
import { Fiber } from "effect"
const result = yield* Fiber.join(fiber)
const exit = yield* Fiber.await(fiber)
yield* Fiber.interrupt(fiber)
const maybeResult = yield* Fiber.poll(fiber)
Racing
Effect.race - First to Complete
const fastest = yield* Effect.race(
fetchFromServer1(),
fetchFromServer2()
)
Effect.raceAll - Race Many
const fastest = yield* Effect.raceAll([
fetchFromCDN1(),
fetchFromCDN2(),
fetchFromCDN3()
])
Effect.raceFirst - Include Failures
const first = yield* Effect.raceFirst(task1, task2)
Deferred - One-Time Promise
import { Deferred } from "effect"
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<string, never>()
const fiber = yield* Effect.fork(
Effect.gen(function* () {
const value = yield* Deferred.await(deferred)
yield* Effect.log(`Got: ${value}`)
})
)
yield* Deferred.succeed(deferred, "Hello!")
yield* Fiber.join(fiber)
})
Queue - Concurrent Queue
import { Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Effect.fork(
Effect.forEach(
[1, 2, 3, 4, 5],
(n) => Queue.offer(queue, n)
)
)
const items = yield* Effect.forEach(
Array.from({ length: 5 }),
() => Queue.take(queue)
)
})
Queue Variants
const bounded = yield* Queue.bounded<number>(100)
const unbounded = yield* Queue.unbounded<number>()
const dropping = yield* Queue.dropping<number>(100)
const sliding = yield* Queue.sliding<number>(100)
PubSub - Publish/Subscribe
import { PubSub } from "effect"
const program = Effect.gen(function* () {
const pubsub = yield* PubSub.bounded<string>(100)
const sub1 = yield* PubSub.subscribe(pubsub)
const sub2 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, "Hello!")
const msg1 = yield* Queue.take(sub1)
const msg2 = yield* Queue.take(sub2)
})
Semaphore - Limit Concurrency
import { Effect } from "effect"
const program = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(3)
yield* Effect.forEach(
tasks,
(task) => semaphore.withPermits(1)(task),
{ concurrency: "unbounded" }
)
})
Latch - Coordination Point
import { Latch } from "effect"
const program = Effect.gen(function* () {
const latch = yield* Latch.make(false)
yield* Effect.fork(
Effect.forEach(
workers,
(worker) =>
Effect.gen(function* () {
yield* Latch.await(latch)
yield* worker.start()
}),
{ concurrency: "unbounded" }
)
)
yield* Latch.open(latch)
})
Interruption
Interrupting Fibers
const fiber = yield* Effect.fork(longTask)
yield* Fiber.interrupt(fiber)
Uninterruptible Regions
const critical = Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* performOperations()
yield* commitTransaction()
})
)
Interruptible Within Uninterruptible
const program = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalSetup()
// This part can be interrupted
yield* Effect.interruptible(longOperation)
yield* criticalTeardown()
})
)
Supervision
Structured concurrency ensures child fibers are managed:
const parent = Effect.gen(function* () {
const child1 = yield* Effect.fork(task1)
const child2 = yield* Effect.fork(task2)
// If parent fails/interrupts, children are interrupted
yield* failingOperation()
})
// child1 and child2 automatically interrupted
Daemon Fibers
Escape supervision with daemon:
const daemon = yield* Effect.forkDaemon(backgroundTask)
Common Patterns
Timeout with Fallback
const withTimeout = task.pipe(
Effect.timeout("5 seconds"),
Effect.map(Option.getOrElse(() => defaultValue))
)
Worker Pool
const workerPool = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(numWorkers)
return (task: Effect.Effect<A>) =>
semaphore.withPermits(1)(task)
})
Parallel with Error Collection
const results = yield* Effect.all(
tasks,
{
concurrency: "unbounded",
mode: "either" // Collect all results
}
)
Best Practices
- Use Effect.all concurrency for simple parallelism
- Use Semaphore to limit concurrent operations
- Prefer structured concurrency over daemon fibers
- Handle interruption in long-running effects
- Use Queue for producer/consumer patterns
- Use Deferred for one-time coordination
Additional Resources
For comprehensive concurrency documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Fibers" for fiber management
- "Basic Concurrency" for parallel execution
- "Deferred" for synchronization primitives
- "Queue" for concurrent queues
- "PubSub" for publish/subscribe
- "Semaphore" for concurrency limiting
GitHub Repository
Related Skills
algorithmic-art
MetaThis Claude Skill creates original algorithmic art using p5.js with seeded randomness and interactive parameters. It generates .md files for algorithmic philosophies, plus .html and .js files for interactive generative art implementations. Use it when developers need to create flow fields, particle systems, or other computational art while avoiding copyright issues.
subagent-driven-development
DevelopmentThis skill executes implementation plans by dispatching a fresh subagent for each independent task, with code review between tasks. It enables fast iteration while maintaining quality gates through this review process. Use it when working on mostly independent tasks within the same session to ensure continuous progress with built-in quality checks.
executing-plans
DesignUse the executing-plans skill when you have a complete implementation plan to execute in controlled batches with review checkpoints. It loads and critically reviews the plan, then executes tasks in small batches (default 3 tasks) while reporting progress between each batch for architect review. This ensures systematic implementation with built-in quality control checkpoints.
cost-optimization
OtherThis Claude Skill helps developers optimize cloud costs through resource rightsizing, tagging strategies, and spending analysis. It provides a framework for reducing cloud expenses and implementing cost governance across AWS, Azure, and GCP. Use it when you need to analyze infrastructure costs, right-size resources, or meet budget constraints.
