.NET Pub-Sub Pattern
A guide for Pub-Sub patterns for event-based asynchronous communication.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
- Core APIs
API Purpose NuGet
System.Reactive (Rx.NET) Reactive event streams System.Reactive
System.Threading.Channels
Async Producer-Consumer BCL
IObservable<T>
Observable sequence BCL
- System.Threading.Channels
2.1 Basic Usage
using System.Threading.Channels;
public sealed class MessageProcessor { private readonly Channel<Message> _channel = Channel.CreateUnbounded<Message>();
// Producer - Send message
public async Task SendAsync(Message message)
{
await _channel.Writer.WriteAsync(message);
}
// Consumer - Process message
public async Task ProcessAsync(CancellationToken ct)
{
await foreach (var message in _channel.Reader.ReadAllAsync(ct))
{
await HandleMessage(message);
}
}
// Channel completion signal
public void Complete() => _channel.Writer.Complete();
}
2.2 Bounded Channel (Backpressure Control)
// Backpressure control with buffer size limit var options = new BoundedChannelOptions(capacity: 100) { FullMode = BoundedChannelFullMode.Wait, // Wait when full SingleReader = true, SingleWriter = false };
var channel = Channel.CreateBounded<Message>(options);
// Writer waits until space is available await channel.Writer.WriteAsync(message);
2.3 Multiple Consumer Pattern
public sealed class WorkerPool { private readonly Channel<WorkItem> _channel; private readonly int _workerCount;
public WorkerPool(int workerCount = 4)
{
_workerCount = workerCount;
_channel = Channel.CreateUnbounded<WorkItem>();
}
public async Task StartAsync(CancellationToken ct)
{
var workers = Enumerable.Range(0, _workerCount)
.Select(_ => ProcessAsync(ct));
await Task.WhenAll(workers);
}
private async Task ProcessAsync(CancellationToken ct)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
await ProcessItem(item);
}
}
public ValueTask EnqueueAsync(WorkItem item) =>
_channel.Writer.WriteAsync(item);
}
- System.Reactive (Rx.NET)
3.1 EventAggregator Pattern
using System.Reactive.Linq; using System.Reactive.Subjects;
public sealed class EventAggregator : IDisposable { private readonly Subject<object> _subject = new();
// Subscribe to specific event type
public IObservable<T> GetEvent<T>() =>
_subject.OfType<T>().AsObservable();
// Publish event
public void Publish<T>(T @event) =>
_subject.OnNext(@event!);
public void Dispose() => _subject.Dispose();
}
3.2 Usage Example
// Event definitions public record UserLoggedIn(string UserId); public record OrderPlaced(int OrderId);
// Subscription var aggregator = new EventAggregator();
aggregator.GetEvent<UserLoggedIn>() .Subscribe(e => Console.WriteLine($"User logged in: {e.UserId}"));
aggregator.GetEvent<OrderPlaced>() .Where(e => e.OrderId > 100) .Subscribe(e => Console.WriteLine($"Large order: {e.OrderId}"));
// Publish aggregator.Publish(new UserLoggedIn("user123")); aggregator.Publish(new OrderPlaced(150));
3.3 Rx Operators
// Debounce - Process only the last event in a sequence searchInput .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .Subscribe(query => Search(query));
// Buffer - Collect events for a period and process as batch events .Buffer(TimeSpan.FromSeconds(5)) .Subscribe(batch => ProcessBatch(batch));
// Retry - Retry on failure observable .Retry(3) .Subscribe( onNext: data => Process(data), onError: ex => LogError(ex) );
- Comparison: Channels vs Rx
Feature Channels Rx.NET
Purpose Producer-Consumer Event streams
Backpressure Built-in (Bounded) Separate implementation
Operators Basic Rich
Learning curve Low High
Dependency BCL NuGet
- DI Integration
// Program.cs services.AddSingleton(Channel.CreateUnbounded<Message>()); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Reader); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Writer);
// Producer public sealed class Producer(ChannelWriter<Message> writer) { public ValueTask SendAsync(Message msg) => writer.WriteAsync(msg); }
// Consumer public sealed class Consumer(ChannelReader<Message> reader) { public async Task ProcessAsync(CancellationToken ct) { await foreach (var msg in reader.ReadAllAsync(ct)) { await Handle(msg); } } }
- Required NuGet Package
<ItemGroup> <PackageReference Include="System.Reactive" Version="6.0.*" /> </ItemGroup>
- Important Notes
Memory Leaks
// Subscription disposal is required var subscription = observable.Subscribe(handler);
// After use subscription.Dispose();
Thread Safety
-
Channels are thread-safe by default
-
Subject is not thread-safe (use Synchronize() if needed)
Backpressure Handling
// Prevent memory explosion with Bounded Channel var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest // Drop old messages });
- References
-
Channels
-
System.Reactive