.NET Streaming (System.IO.Pipelines)
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
- Core Concepts
Concept Description
Pipe
Memory buffer-based read/write pipe
PipeReader
Read data from pipe
PipeWriter
Write data to pipe
ReadOnlySequence<T>
Non-contiguous memory sequence
- Advantages
-
Zero-copy: Minimizes unnecessary memory copying
-
Backpressure control: Speed regulation between producer and consumer
-
Memory pooling: Automatic buffer reuse
-
Async I/O: Efficient asynchronous processing
- Basic Usage
using System.IO.Pipelines;
public sealed class PipelineProcessor { public async Task ProcessAsync(Stream stream) { var pipe = new Pipe();
// Run Writer and Reader concurrently
var writing = FillPipeAsync(stream, pipe.Writer);
var reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Acquire buffer from memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
// Notify bytes written
writer.Advance(bytesRead);
// Flush data and notify Reader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
// Signal write completion
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// Process buffer
ProcessBuffer(buffer);
// Notify consumption up to processed position
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
// Signal read completion
await reader.CompleteAsync();
}
}
- Line-by-Line Parsing
private async Task ReadLinesAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
ProcessLine(line);
}
// Notify unprocessed data position
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
private bool TryReadLine( ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) { // Find newline SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to newline
line = buffer.Slice(0, position.Value);
// Move buffer past newline
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
- Processing ReadOnlySequence
private void ProcessBuffer(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { // Single segment - direct access ProcessSpan(buffer.FirstSpan); } else { // Multiple segments - iteration required foreach (var segment in buffer) { ProcessSpan(segment.Span); } } }
- Network I/O Integration
public async Task ProcessSocketAsync(Socket socket) { var pipe = new Pipe();
var writing = ReceiveAsync(socket, pipe.Writer);
var reading = ProcessAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task ReceiveAsync(Socket socket, PipeWriter writer) { while (true) { Memory<byte> memory = writer.GetMemory(4096);
int bytesReceived = await socket.ReceiveAsync(
memory,
SocketFlags.None);
if (bytesReceived == 0)
break;
writer.Advance(bytesReceived);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
- PipeOptions Configuration
var pipeOptions = new PipeOptions( pool: MemoryPool<byte>.Shared, // Memory pool readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler pauseWriterThreshold: 64 * 1024, // Writer pause threshold resumeWriterThreshold: 32 * 1024, // Writer resume threshold minimumSegmentSize: 4096, // Minimum segment size useSynchronizationContext: false );
var pipe = new Pipe(pipeOptions);
- Required NuGet Package
<ItemGroup> <!-- Included in BCL for .NET Core 3.0+ --> <PackageReference Include="System.IO.Pipelines" Version="9.0.*" /> </ItemGroup>
- Important Notes
AdvanceTo Call Required
// Must call AdvanceTo after ReadAsync ReadResult result = await reader.ReadAsync(); // ... processing ... reader.AdvanceTo(consumed, examined);
Buffer Lifetime
// ❌ Bad example: Saving buffer after ReadAsync ReadOnlySequence<byte> saved; var result = await reader.ReadAsync(); saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo
// ✅ Good example: Copy needed data var copy = result.Buffer.ToArray(); reader.AdvanceTo(result.Buffer.End);
Completion Calls
// Must call CompleteAsync for both Writer and Reader await writer.CompleteAsync(); await reader.CompleteAsync();
- References
-
System.IO.Pipelines
-
High-performance I/O