Azure Event Hubs SDK for TypeScript
High-throughput event streaming and real-time data ingestion.
Installation
npm install @azure/event-hubs @azure/identity
For checkpointing with consumer groups:
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
Environment Variables
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net EVENTHUB_NAME=my-eventhub STORAGE_ACCOUNT_NAME=<storage-account> STORAGE_CONTAINER_NAME=checkpoints
Authentication
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!; const eventHubName = process.env.EVENTHUB_NAME!; const credential = new DefaultAzureCredential();
// Producer const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer const consumer = new EventHubConsumerClient( "$Default", // Consumer group fullyQualifiedNamespace, eventHubName, credential );
Core Workflow
Send Events
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Create batch and add events const batch = await producer.createBatch(); batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } }); batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch); await producer.close();
Send to Specific Partition
// By partition ID const batch = await producer.createBatch({ partitionId: "0" });
// By partition key (consistent hashing) const batch = await producer.createBatch({ partitionKey: "device-123" });
Receive Events (Simple)
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)});
}
},
processError: async (err, context) => {
console.error(Error on partition ${context.partitionId}: ${err.message});
},
});
// Stop after some time setTimeout(async () => { await subscription.close(); await consumer.close(); }, 60000);
Receive with Checkpointing (Production)
import { EventHubConsumerClient } from "@azure/event-hubs"; import { ContainerClient } from "@azure/storage-blob"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
https://${storageAccount}.blob.core.windows.net/${containerName},
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient( "$Default", namespace, eventHubName, credential, checkpointStore );
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(Processing: ${JSON.stringify(event.body)});
}
// Checkpoint after processing batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(Error: ${err.message});
},
});
Receive from Specific Position
const subscription = consumer.subscribe({ processEvents: async (events, context) => { /* ... / }, processError: async (err, context) => { / ... */ }, }, { startPosition: { // Start from beginning "0": { offset: "@earliest" }, // Start from end (new events only) "1": { offset: "@latest" }, // Start from specific offset "2": { offset: "12345" }, // Start from specific time "3": { enqueuedOn: new Date("2024-01-01") }, }, });
Event Hub Properties
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(Partitions: ${hubProperties.partitionIds});
// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber});
Batch Processing Options
const subscription = consumer.subscribe( { processEvents: async (events, context) => { /* ... / }, processError: async (err, context) => { / ... */ }, }, { maxBatchSize: 100, // Max events per batch maxWaitTimeInSeconds: 30, // Max wait for batch } );
Key Types
import { EventHubProducerClient, EventHubConsumerClient, EventData, ReceivedEventData, PartitionContext, Subscription, SubscriptionEventHandlers, CreateBatchOptions, EventPosition, } from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
Event Properties
// Send with properties const batch = await producer.createBatch(); batch.tryAdd({ body: { data: "payload" }, properties: { eventType: "telemetry", deviceId: "sensor-1", }, contentType: "application/json", correlationId: "request-123", });
// Access in receiver
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(Type: ${event.properties?.eventType});
console.log(Sequence: ${event.sequenceNumber});
console.log(Enqueued: ${event.enqueuedTimeUtc});
console.log(Offset: ${event.offset});
}
},
});
Error Handling
consumer.subscribe({ processEvents: async (events, context) => { try { for (const event of events) { await processEvent(event); } await context.updateCheckpoint(events[events.length - 1]); } catch (error) { // Don't checkpoint on error - events will be reprocessed console.error("Processing failed:", error); } }, processError: async (err, context) => { if (err.name === "MessagingError") { // Transient error - SDK will retry console.warn("Transient error:", err.message); } else { // Fatal error console.error("Fatal error:", err); } }, });
Best Practices
-
Use checkpointing - Always checkpoint in production for exactly-once processing
-
Batch sends - Use createBatch() for efficient sending
-
Partition keys - Use partition keys to ensure ordering for related events
-
Consumer groups - Use separate consumer groups for different processing pipelines
-
Handle errors gracefully - Don't checkpoint on processing failures
-
Close clients - Always close producer/consumer when done
-
Monitor lag - Track lastEnqueuedSequenceNumber vs processed sequence