sinks

This skill should be used when the user asks about "Effect Sink", "Sink.collectAll", "Sink.sum", "Sink.fold", "stream consumers", "Sink.forEach", "creating sinks", "sink operations", "sink leftovers", "sink concurrency", "Stream.run with Sink", or needs to understand how Effect Sinks consume stream data.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "sinks" with this command: npx skills add andrueandersoncs/claude-skill-effect-ts/andrueandersoncs-claude-skill-effect-ts-sinks

Sinks in Effect

Overview

A Sink is a consumer of stream elements that produces a result:

Sink<A, In, L, E, R>;
// A  - Result type (what sink produces)
// In - Input element type (what sink consumes)
// L  - Leftover type (unconsumed elements)
// E  - Error type
// R  - Required environment

Sinks are the counterpart to Streams - while Streams produce data, Sinks consume it.

Built-in Sinks

Collecting Elements

import { Stream, Sink } from "effect";

const all = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAll()));

const array = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.collectAllToArray()));

const firstThree = yield * Stream.range(1, 100).pipe(Stream.run(Sink.collectAllN(3)));

const whileSmall = yield * Stream.iterate(1, (n) => n + 1).pipe(Stream.run(Sink.collectAllWhile((n) => n < 5)));

Aggregation Sinks

const total = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.sum));

const count = yield * Stream.make("a", "b", "c").pipe(Stream.run(Sink.count));

const first = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.head));

const last = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.last));

const taken = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take(3)));

Folding Sinks

const product = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.foldLeft(1, (acc, n) => acc * n)));

const sumUntil100 =
  yield *
  Stream.iterate(1, (n) => n + 1).pipe(
    Stream.run(
      Sink.fold(
        0,
        (sum) => sum < 100,
        (sum, n) => sum + n,
      ),
    ),
  );

const foldWithLog = Sink.foldEffect(
  0,
  (sum) => sum < 100,
  (sum, n) =>
    Effect.gen(function* () {
      yield* Effect.log(`Adding ${n} to ${sum}`);
      return sum + n;
    }),
);

Side Effect Sinks

yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.forEach((n) => Effect.log(`Got: ${n}`))));

yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.drain));

Creating Custom Sinks

Sink.make

const maxSink = Sink.make<number, number, never, never, never>(
  // Initial state
  Number.NEGATIVE_INFINITY,
  // Process each element
  (max, n) => (n > max ? n : max),
  // Extract result
  (max) => max,
);

const max = yield * Stream.make(3, 1, 4, 1, 5, 9).pipe(Stream.run(maxSink)); // 9

Sink.fromEffect

const logAndReturn = <A>(label: string) =>
  Sink.fromEffect(
    Effect.gen(function* () {
      yield* Effect.log(`Starting ${label}`);
      return [] as A[];
    }),
  );

Sink.fromPush

For more control over the sink lifecycle:

const customSink = Sink.fromPush<number, number, never, never>((input) =>
  Effect.sync(() =>
    Option.match(input, {
      onNone: () => Either.left(finalResult), // Stream ended
      onSome: (chunk) => {
        // Process chunk
        // Return Either.right to continue, Either.left to finish
        return Either.right(undefined);
      },
    }),
  ),
);

Sink Operations

Transforming Sinks

const doubledSum = Sink.sum.pipe(Sink.map((sum) => sum * 2));

const lengthSum = Sink.sum.pipe(Sink.contramap((s: string) => s.length));

const processStrings = Sink.sum.pipe(
  Sink.dimap(
    (s: string) => s.length,
    (sum) => `Total length: ${sum}`,
  ),
);

Combining Sinks

const sumAndCount = Sink.zip(Sink.sum, Sink.count);

const [sum, count] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(sumAndCount));

const firstOrSum = Sink.race(Sink.head, Sink.sum.pipe(Sink.map(Option.some)));

Filtering

const sumPositive = Sink.sum.pipe(Sink.filterInput((n: number) => n > 0));

const result = yield * Stream.make(-1, 2, -3, 4, -5).pipe(Stream.run(sumPositive));

Leftovers

Sinks can leave unconsumed elements:

const takeThree = Sink.take<number>(3);

const [first, rest] =
  yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take<number>(3).pipe(Sink.collectLeftover)));

Sink Concurrency

Parallel Sinks

const parallelSinks = Sink.zipPar(Sink.sum, Sink.count, Sink.collectAll<number>());

const [sum, count, all] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(parallelSinks));

Chunked Processing

const chunkedSum = Sink.foldChunks(
  0,
  () => true,
  (sum, chunk: Chunk.Chunk<number>) => sum + Chunk.reduce(chunk, 0, (a, b) => a + b),
);

Common Patterns

Batched Database Insert

const batchInsert = (batchSize: number) =>
  Sink.collectAllN<Record>(batchSize).pipe(
    Sink.mapEffect((batch) => Effect.tryPromise(() => db.insertMany(Chunk.toArray(batch)))),
  );

yield * recordStream.pipe(Stream.run(batchInsert(100)));

Aggregation Pipeline

const stats = Sink.zip(Sink.sum, Sink.zip(Sink.count, Sink.zip(Sink.head, Sink.last))).pipe(
  Sink.map(([sum, [count, [first, last]]]) => ({
    sum,
    count,
    average: count > 0 ? sum / count : 0,
    first,
    last,
  })),
);

Write to File

const writeToFile = (path: string) =>
  Sink.forEach((line: string) =>
    Effect.gen(function* () {
      const fs = yield* FileSystem;
      yield* fs.appendFileString(path, line + "\n");
    }),
  );

Best Practices

  1. Use built-in sinks when possible - Optimized and tested
  2. Combine sinks with zip - Run multiple aggregations in one pass
  3. Use foldChunks for efficiency - Process chunks, not elements
  4. Handle leftovers - Consider unconsumed elements
  5. Prefer Sink over forEach - More composable

Additional Resources

For comprehensive sink documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Creating Sinks" for sink construction
  • "Sink Operations" for transformations
  • "Sink Concurrency" for parallel processing
  • "Leftovers" for handling unconsumed elements

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

traits

No summary provided by upstream source.

Repository SourceNeeds Review
General

configuration

No summary provided by upstream source.

Repository SourceNeeds Review
General

schema

No summary provided by upstream source.

Repository SourceNeeds Review