effect-streams-pipelines

Streams & Pipelines

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "effect-streams-pipelines" with this command: npx skills add mepuka/effect-ontology/mepuka-effect-ontology-effect-streams-pipelines

Streams & Pipelines

When to use

  • You’re building data pipelines with batching/backpressure

  • You need controlled concurrency per element

  • You must process large inputs with constant memory

Create

const s = Stream.fromIterable(items)

Transform

const out = s.pipe( Stream.mapEffect(processItem, { concurrency: 4 }), Stream.filter((a) => a.valid), Stream.grouped(100) )

Consume

yield* Stream.runDrain(out) // or const all = yield* Stream.runCollect(out)

Resource-Safe

const fileLines = Stream.acquireRelease(open(), close).pipe( Stream.flatMap(readLines) )

Resilience

const resilient = s.pipe( Stream.mapEffect((x) => op(x).pipe(Effect.retry(retry))) )

Real-world snippet: Stream to S3 with progress and scoped background ticker

let downloadedBytes = 0

yield* Effect.gen(function* () { // background progress ticker yield* Effect.repeat( Effect.gen(function* () { const bytes = yield* Effect.succeed(downloadedBytes) yield* Effect.log(Downloaded ${bytes}/${contentLength} bytes) }), Schedule.forever.pipe(Schedule.delayed(() => "2 seconds")) ).pipe(Effect.delay("100 millis"), Effect.forkScoped)

yield* s3.putObject(key, resp.stream.pipe( Stream.tap((chunk) => { downloadedBytes += chunk.length; return Effect.void }) ), { contentLength } ) }).pipe(Effect.scoped)

Guidance

  • Prefer Stream.mapEffect with concurrency to control parallel work

  • Use grouped(n) for batching network/DB operations

  • Always model resource acquisition with acquireRelease

Pitfalls

  • Collecting massive streams into memory → prefer runDrain or chunked writes

  • Doing blocking IO in transformations → keep operations effectful and non-blocking

Cross-links

Local Source Reference

CRITICAL: Search local Effect source before implementing

The full Effect source code is available at docs/effect-source/ . Always search the actual implementation before writing Effect code.

Key Source Files

  • Stream: docs/effect-source/effect/src/Stream.ts

  • Sink: docs/effect-source/effect/src/Sink.ts

  • Channel: docs/effect-source/effect/src/Channel.ts

Example Searches

Find Stream creation patterns

grep -F "fromIterable" docs/effect-source/effect/src/Stream.ts grep -F "make" docs/effect-source/effect/src/Stream.ts grep -F "fromEffect" docs/effect-source/effect/src/Stream.ts

Study Stream transformations

grep -F "mapEffect" docs/effect-source/effect/src/Stream.ts grep -F "filter" docs/effect-source/effect/src/Stream.ts grep -F "grouped" docs/effect-source/effect/src/Stream.ts

Find Stream consumption

grep -F "runDrain" docs/effect-source/effect/src/Stream.ts grep -F "runCollect" docs/effect-source/effect/src/Stream.ts

Look at Stream test examples

grep -F "Stream." docs/effect-source/effect/test/Stream.test.ts

Workflow

  • Identify the Stream API you need (e.g., mapEffect, grouped)

  • Search docs/effect-source/effect/src/Stream.ts for the implementation

  • Study the types and pipeline patterns

  • Look at test files for usage examples

  • Write your code based on real implementations

Real source code > documentation > assumptions

References

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

effect-index

No summary provided by upstream source.

Repository SourceNeeds Review
299-mepuka
General

effect-patterns-hub

No summary provided by upstream source.

Repository SourceNeeds Review
General

effect-concurrency-fibers

No summary provided by upstream source.

Repository SourceNeeds Review
General

effect-errors-retries

No summary provided by upstream source.

Repository SourceNeeds Review