Streaming Patterns
Primary Primitive: iter.Seq2[T, error]
func (m *Model) Stream(ctx context.Context, msgs []schema.Message) iter.Seq2[schema.StreamChunk, error] { return func(yield func(schema.StreamChunk, error) bool) { stream, err := m.client.Stream(ctx, msgs) if err != nil { yield(schema.StreamChunk{}, err); return } defer stream.Close() for { select { case <-ctx.Done(): yield(schema.StreamChunk{}, ctx.Err()); return default: } chunk, err := stream.Recv() if err == io.EOF { return } if err != nil { yield(schema.StreamChunk{}, err); return } if !yield(convertChunk(chunk), nil) { return } // consumer stopped } } }
Composition
-
Pipe: func Pipe[A, B any](first iter.Seq2[A, error], transform func(A) (B, error)) iter.Seq2[B, error]
-
Collect: Stream to slice — func Collect[T any](stream iter.Seq2[T, error]) ([]T, error)
-
Invoke from Stream: Stream, collect, return last.
-
Fan-out: iter.Pull2() to get next/stop, broadcast to N consumers.
-
BufferedStream: Channel-backed buffer for backpressure.
Rules
-
Public API: iter.Seq2[T, error] — never <-chan .
-
Internal goroutine communication: channels are fine.
-
Always check context cancellation in producers.
-
yield returning false = consumer stopped — respect immediately.
-
Use iter.Pull2 only when pull semantics are genuinely needed.