Azure Event Hubs SDK for Java
Build real-time streaming applications using the Azure Event Hubs SDK for Java.
Installation
<dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs</artifactId> <version>5.19.0</version> </dependency>
<!-- For checkpoint store (production) --> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> <version>1.20.0</version> </dependency>
Client Creation
EventHubProducerClient
import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.EventHubClientBuilder;
// With connection string EventHubProducerClient producer = new EventHubClientBuilder() .connectionString("<connection-string>", "<event-hub-name>") .buildProducerClient();
// Full connection string with EntityPath EventHubProducerClient producer = new EventHubClientBuilder() .connectionString("<connection-string-with-entity-path>") .buildProducerClient();
With DefaultAzureCredential
import com.azure.identity.DefaultAzureCredentialBuilder;
EventHubProducerClient producer = new EventHubClientBuilder() .fullyQualifiedNamespace("<namespace>.servicebus.windows.net") .eventHubName("<event-hub-name>") .credential(new DefaultAzureCredentialBuilder().build()) .buildProducerClient();
EventHubConsumerClient
import com.azure.messaging.eventhubs.EventHubConsumerClient;
EventHubConsumerClient consumer = new EventHubClientBuilder() .connectionString("<connection-string>", "<event-hub-name>") .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .buildConsumerClient();
Async Clients
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder() .connectionString("<connection-string>", "<event-hub-name>") .buildAsyncProducerClient();
EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder() .connectionString("<connection-string>", "<event-hub-name>") .consumerGroup("$Default") .buildAsyncConsumerClient();
Core Patterns
Send Single Event
import com.azure.messaging.eventhubs.EventData;
EventData eventData = new EventData("Hello, Event Hubs!"); producer.send(Collections.singletonList(eventData));
Send Event Batch
import com.azure.messaging.eventhubs.EventDataBatch; import com.azure.messaging.eventhubs.models.CreateBatchOptions;
// Create batch EventDataBatch batch = producer.createBatch();
// Add events (returns false if batch is full) for (int i = 0; i < 100; i++) { EventData event = new EventData("Event " + i); if (!batch.tryAdd(event)) { // Batch is full, send and create new batch producer.send(batch); batch = producer.createBatch(); batch.tryAdd(event); } }
// Send remaining events if (batch.getCount() > 0) { producer.send(batch); }
Send to Specific Partition
CreateBatchOptions options = new CreateBatchOptions() .setPartitionId("0");
EventDataBatch batch = producer.createBatch(options); batch.tryAdd(new EventData("Partition 0 event")); producer.send(batch);
Send with Partition Key
CreateBatchOptions options = new CreateBatchOptions() .setPartitionKey("customer-123");
EventDataBatch batch = producer.createBatch(options); batch.tryAdd(new EventData("Customer event")); producer.send(batch);
Event with Properties
EventData event = new EventData("Order created"); event.getProperties().put("orderId", "ORD-123"); event.getProperties().put("customerId", "CUST-456"); event.getProperties().put("priority", 1);
producer.send(Collections.singletonList(event));
Receive Events (Simple)
import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.PartitionEvent;
// Receive from specific partition Iterable<PartitionEvent> events = consumer.receiveFromPartition( "0", // partitionId 10, // maxEvents EventPosition.earliest(), // startingPosition Duration.ofSeconds(30) // timeout );
for (PartitionEvent partitionEvent : events) { EventData event = partitionEvent.getData(); System.out.println("Body: " + event.getBodyAsString()); System.out.println("Sequence: " + event.getSequenceNumber()); System.out.println("Offset: " + event.getOffset()); }
EventProcessorClient (Production)
import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder;
// Create checkpoint store BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder() .connectionString("<storage-connection-string>") .containerName("checkpoints") .buildAsyncClient();
// Create processor EventProcessorClient processor = new EventProcessorClientBuilder() .connectionString("<eventhub-connection-string>", "<event-hub-name>") .consumerGroup("$Default") .checkpointStore(new BlobCheckpointStore(blobClient)) .processEvent(eventContext -> { EventData event = eventContext.getEventData(); System.out.println("Processing: " + event.getBodyAsString());
// Checkpoint after processing
eventContext.updateCheckpoint();
})
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable().getMessage());
System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());
})
.buildEventProcessorClient();
// Start processing processor.start();
// Keep running... Thread.sleep(Duration.ofMinutes(5).toMillis());
// Stop gracefully processor.stop();
Batch Processing
EventProcessorClient processor = new EventProcessorClientBuilder() .connectionString("<connection-string>", "<event-hub-name>") .consumerGroup("$Default") .checkpointStore(new BlobCheckpointStore(blobClient)) .processEventBatch(eventBatchContext -> { List<EventData> events = eventBatchContext.getEvents(); System.out.printf("Received %d events%n", events.size());
for (EventData event : events) {
// Process each event
System.out.println(event.getBodyAsString());
}
// Checkpoint after batch
eventBatchContext.updateCheckpoint();
}, 50) // maxBatchSize
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable());
})
.buildEventProcessorClient();
Async Receiving
asyncConsumer.receiveFromPartition("0", EventPosition.latest()) .subscribe( partitionEvent -> { EventData event = partitionEvent.getData(); System.out.println("Received: " + event.getBodyAsString()); }, error -> System.err.println("Error: " + error), () -> System.out.println("Complete") );
Get Event Hub Properties
// Get hub info EventHubProperties hubProps = producer.getEventHubProperties(); System.out.println("Hub: " + hubProps.getName()); System.out.println("Partitions: " + hubProps.getPartitionIds());
// Get partition info PartitionProperties partitionProps = producer.getPartitionProperties("0"); System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber()); System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber()); System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());
Event Positions
// Start from beginning EventPosition.earliest()
// Start from end (new events only) EventPosition.latest()
// From specific offset EventPosition.fromOffset(12345L)
// From specific sequence number EventPosition.fromSequenceNumber(100L)
// From specific time EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))
Error Handling
import com.azure.messaging.eventhubs.models.ErrorContext;
.processError(errorContext -> { Throwable error = errorContext.getThrowable(); String partitionId = errorContext.getPartitionContext().getPartitionId();
if (error instanceof AmqpException) {
AmqpException amqpError = (AmqpException) error;
if (amqpError.isTransient()) {
System.out.println("Transient error, will retry");
}
}
System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());
})
Resource Cleanup
// Always close clients try { producer.send(batch); } finally { producer.close(); }
// Or use try-with-resources try (EventHubProducerClient producer = new EventHubClientBuilder() .connectionString(connectionString, eventHubName) .buildProducerClient()) { producer.send(events); }
Environment Variables
EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=... EVENT_HUBS_NAME=<event-hub-name> STORAGE_CONNECTION_STRING=<for-checkpointing>
Best Practices
-
Use EventProcessorClient: For production, provides load balancing and checkpointing
-
Batch Events: Use EventDataBatch for efficient sending
-
Partition Keys: Use for ordering guarantees within a partition
-
Checkpointing: Checkpoint after processing to avoid reprocessing
-
Error Handling: Handle transient errors with retries
-
Close Clients: Always close producer/consumer when done
Trigger Phrases
-
"Event Hubs Java"
-
"event streaming Azure"
-
"real-time data ingestion"
-
"EventProcessorClient"
-
"event hub producer consumer"
-
"partition processing"