Azure Event Hubs SDK for Java Build real-time streaming applications using the Azure Event Hubs SDK for Java. Installation < dependency
< groupId
com.azure </ groupId
< artifactId
azure-messaging-eventhubs </ artifactId
< version
5.19.0 </ version
</ dependency
- <
- dependency
- >
- <
- groupId
- >
- com.azure
- </
- groupId
- >
- <
- artifactId
- >
- azure-messaging-eventhubs-checkpointstore-blob
- </
- artifactId
- >
- <
- version
- >
- 1.20.0
- </
- version
- >
- </
- dependency
- >
- Client Creation
- EventHubProducerClient
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventHubProducerClient
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventHubClientBuilder
- ;
- // With connection string
- EventHubProducerClient
- producer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- buildProducerClient
- (
- )
- ;
- // Full connection string with EntityPath
- EventHubProducerClient
- producer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - )
- .
- buildProducerClient
- (
- )
- ;
- With DefaultAzureCredential
- import
- com
- .
- azure
- .
- identity
- .
- DefaultAzureCredentialBuilder
- ;
- EventHubProducerClient
- producer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- fullyQualifiedNamespace
- (
- "
.servicebus.windows.net" - )
- .
- eventHubName
- (
- "
" - )
- .
- credential
- (
- new
- DefaultAzureCredentialBuilder
- (
- )
- .
- build
- (
- )
- )
- .
- buildProducerClient
- (
- )
- ;
- EventHubConsumerClient
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventHubConsumerClient
- ;
- EventHubConsumerClient
- consumer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- consumerGroup
- (
- EventHubClientBuilder
- .
- DEFAULT_CONSUMER_GROUP_NAME
- )
- .
- buildConsumerClient
- (
- )
- ;
- Async Clients
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventHubProducerAsyncClient
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventHubConsumerAsyncClient
- ;
- EventHubProducerAsyncClient
- asyncProducer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- buildAsyncProducerClient
- (
- )
- ;
- EventHubConsumerAsyncClient
- asyncConsumer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- consumerGroup
- (
- "$Default"
- )
- .
- buildAsyncConsumerClient
- (
- )
- ;
- Core Patterns
- Send Single Event
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventData
- ;
- EventData
- eventData
- =
- new
- EventData
- (
- "Hello, Event Hubs!"
- )
- ;
- producer
- .
- send
- (
- Collections
- .
- singletonList
- (
- eventData
- )
- )
- ;
- Send Event Batch
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventDataBatch
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- models
- .
- CreateBatchOptions
- ;
- // Create batch
- EventDataBatch
- batch
- =
- producer
- .
- createBatch
- (
- )
- ;
- // Add events (returns false if batch is full)
- for
- (
- int
- i
- =
- 0
- ;
- i
- <
- 100
- ;
- i
- ++
- )
- {
- EventData
- event
- =
- new
- EventData
- (
- "Event "
- +
- i
- )
- ;
- if
- (
- !
- batch
- .
- tryAdd
- (
- event
- )
- )
- {
- // Batch is full, send and create new batch
- producer
- .
- send
- (
- batch
- )
- ;
- batch
- =
- producer
- .
- createBatch
- (
- )
- ;
- batch
- .
- tryAdd
- (
- event
- )
- ;
- }
- }
- // Send remaining events
- if
- (
- batch
- .
- getCount
- (
- )
- >
- 0
- )
- {
- producer
- .
- send
- (
- batch
- )
- ;
- }
- Send to Specific Partition
- CreateBatchOptions
- options
- =
- new
- CreateBatchOptions
- (
- )
- .
- setPartitionId
- (
- "0"
- )
- ;
- EventDataBatch
- batch
- =
- producer
- .
- createBatch
- (
- options
- )
- ;
- batch
- .
- tryAdd
- (
- new
- EventData
- (
- "Partition 0 event"
- )
- )
- ;
- producer
- .
- send
- (
- batch
- )
- ;
- Send with Partition Key
- CreateBatchOptions
- options
- =
- new
- CreateBatchOptions
- (
- )
- .
- setPartitionKey
- (
- "customer-123"
- )
- ;
- EventDataBatch
- batch
- =
- producer
- .
- createBatch
- (
- options
- )
- ;
- batch
- .
- tryAdd
- (
- new
- EventData
- (
- "Customer event"
- )
- )
- ;
- producer
- .
- send
- (
- batch
- )
- ;
- Event with Properties
- EventData
- event
- =
- new
- EventData
- (
- "Order created"
- )
- ;
- event
- .
- getProperties
- (
- )
- .
- put
- (
- "orderId"
- ,
- "ORD-123"
- )
- ;
- event
- .
- getProperties
- (
- )
- .
- put
- (
- "customerId"
- ,
- "CUST-456"
- )
- ;
- event
- .
- getProperties
- (
- )
- .
- put
- (
- "priority"
- ,
- 1
- )
- ;
- producer
- .
- send
- (
- Collections
- .
- singletonList
- (
- event
- )
- )
- ;
- Receive Events (Simple)
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- models
- .
- EventPosition
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- models
- .
- PartitionEvent
- ;
- // Receive from specific partition
- Iterable
- <
- PartitionEvent
- >
- events
- =
- consumer
- .
- receiveFromPartition
- (
- "0"
- ,
- // partitionId
- 10
- ,
- // maxEvents
- EventPosition
- .
- earliest
- (
- )
- ,
- // startingPosition
- Duration
- .
- ofSeconds
- (
- 30
- )
- // timeout
- )
- ;
- for
- (
- PartitionEvent
- partitionEvent
- :
- events
- )
- {
- EventData
- event
- =
- partitionEvent
- .
- getData
- (
- )
- ;
- System
- .
- out
- .
- println
- (
- "Body: "
- +
- event
- .
- getBodyAsString
- (
- )
- )
- ;
- System
- .
- out
- .
- println
- (
- "Sequence: "
- +
- event
- .
- getSequenceNumber
- (
- )
- )
- ;
- System
- .
- out
- .
- println
- (
- "Offset: "
- +
- event
- .
- getOffset
- (
- )
- )
- ;
- }
- EventProcessorClient (Production)
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventProcessorClient
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- EventProcessorClientBuilder
- ;
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- checkpointstore
- .
- blob
- .
- BlobCheckpointStore
- ;
- import
- com
- .
- azure
- .
- storage
- .
- blob
- .
- BlobContainerAsyncClient
- ;
- import
- com
- .
- azure
- .
- storage
- .
- blob
- .
- BlobContainerClientBuilder
- ;
- // Create checkpoint store
- BlobContainerAsyncClient
- blobClient
- =
- new
- BlobContainerClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - )
- .
- containerName
- (
- "checkpoints"
- )
- .
- buildAsyncClient
- (
- )
- ;
- // Create processor
- EventProcessorClient
- processor
- =
- new
- EventProcessorClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- consumerGroup
- (
- "$Default"
- )
- .
- checkpointStore
- (
- new
- BlobCheckpointStore
- (
- blobClient
- )
- )
- .
- processEvent
- (
- eventContext
- ->
- {
- EventData
- event
- =
- eventContext
- .
- getEventData
- (
- )
- ;
- System
- .
- out
- .
- println
- (
- "Processing: "
- +
- event
- .
- getBodyAsString
- (
- )
- )
- ;
- // Checkpoint after processing
- eventContext
- .
- updateCheckpoint
- (
- )
- ;
- }
- )
- .
- processError
- (
- errorContext
- ->
- {
- System
- .
- err
- .
- println
- (
- "Error: "
- +
- errorContext
- .
- getThrowable
- (
- )
- .
- getMessage
- (
- )
- )
- ;
- System
- .
- err
- .
- println
- (
- "Partition: "
- +
- errorContext
- .
- getPartitionContext
- (
- )
- .
- getPartitionId
- (
- )
- )
- ;
- }
- )
- .
- buildEventProcessorClient
- (
- )
- ;
- // Start processing
- processor
- .
- start
- (
- )
- ;
- // Keep running...
- Thread
- .
- sleep
- (
- Duration
- .
- ofMinutes
- (
- 5
- )
- .
- toMillis
- (
- )
- )
- ;
- // Stop gracefully
- processor
- .
- stop
- (
- )
- ;
- Batch Processing
- EventProcessorClient
- processor
- =
- new
- EventProcessorClientBuilder
- (
- )
- .
- connectionString
- (
- "
" - ,
- "
" - )
- .
- consumerGroup
- (
- "$Default"
- )
- .
- checkpointStore
- (
- new
- BlobCheckpointStore
- (
- blobClient
- )
- )
- .
- processEventBatch
- (
- eventBatchContext
- ->
- {
- List
- <
- EventData
- >
- events
- =
- eventBatchContext
- .
- getEvents
- (
- )
- ;
- System
- .
- out
- .
- printf
- (
- "Received %d events%n"
- ,
- events
- .
- size
- (
- )
- )
- ;
- for
- (
- EventData
- event
- :
- events
- )
- {
- // Process each event
- System
- .
- out
- .
- println
- (
- event
- .
- getBodyAsString
- (
- )
- )
- ;
- }
- // Checkpoint after batch
- eventBatchContext
- .
- updateCheckpoint
- (
- )
- ;
- }
- ,
- 50
- )
- // maxBatchSize
- .
- processError
- (
- errorContext
- ->
- {
- System
- .
- err
- .
- println
- (
- "Error: "
- +
- errorContext
- .
- getThrowable
- (
- )
- )
- ;
- }
- )
- .
- buildEventProcessorClient
- (
- )
- ;
- Async Receiving
- asyncConsumer
- .
- receiveFromPartition
- (
- "0"
- ,
- EventPosition
- .
- latest
- (
- )
- )
- .
- subscribe
- (
- partitionEvent
- ->
- {
- EventData
- event
- =
- partitionEvent
- .
- getData
- (
- )
- ;
- System
- .
- out
- .
- println
- (
- "Received: "
- +
- event
- .
- getBodyAsString
- (
- )
- )
- ;
- }
- ,
- error
- ->
- System
- .
- err
- .
- println
- (
- "Error: "
- +
- error
- )
- ,
- (
- )
- ->
- System
- .
- out
- .
- println
- (
- "Complete"
- )
- )
- ;
- Get Event Hub Properties
- // Get hub info
- EventHubProperties
- hubProps
- =
- producer
- .
- getEventHubProperties
- (
- )
- ;
- System
- .
- out
- .
- println
- (
- "Hub: "
- +
- hubProps
- .
- getName
- (
- )
- )
- ;
- System
- .
- out
- .
- println
- (
- "Partitions: "
- +
- hubProps
- .
- getPartitionIds
- (
- )
- )
- ;
- // Get partition info
- PartitionProperties
- partitionProps
- =
- producer
- .
- getPartitionProperties
- (
- "0"
- )
- ;
- System
- .
- out
- .
- println
- (
- "Begin sequence: "
- +
- partitionProps
- .
- getBeginningSequenceNumber
- (
- )
- )
- ;
- System
- .
- out
- .
- println
- (
- "Last sequence: "
- +
- partitionProps
- .
- getLastEnqueuedSequenceNumber
- (
- )
- )
- ;
- System
- .
- out
- .
- println
- (
- "Last offset: "
- +
- partitionProps
- .
- getLastEnqueuedOffset
- (
- )
- )
- ;
- Event Positions
- // Start from beginning
- EventPosition
- .
- earliest
- (
- )
- // Start from end (new events only)
- EventPosition
- .
- latest
- (
- )
- // From specific offset
- EventPosition
- .
- fromOffset
- (
- 12345L
- )
- // From specific sequence number
- EventPosition
- .
- fromSequenceNumber
- (
- 100L
- )
- // From specific time
- EventPosition
- .
- fromEnqueuedTime
- (
- Instant
- .
- now
- (
- )
- .
- minus
- (
- Duration
- .
- ofHours
- (
- 1
- )
- )
- )
- Error Handling
- import
- com
- .
- azure
- .
- messaging
- .
- eventhubs
- .
- models
- .
- ErrorContext
- ;
- .
- processError
- (
- errorContext
- ->
- {
- Throwable
- error
- =
- errorContext
- .
- getThrowable
- (
- )
- ;
- String
- partitionId
- =
- errorContext
- .
- getPartitionContext
- (
- )
- .
- getPartitionId
- (
- )
- ;
- if
- (
- error
- instanceof
- AmqpException
- )
- {
- AmqpException
- amqpError
- =
- (
- AmqpException
- )
- error
- ;
- if
- (
- amqpError
- .
- isTransient
- (
- )
- )
- {
- System
- .
- out
- .
- println
- (
- "Transient error, will retry"
- )
- ;
- }
- }
- System
- .
- err
- .
- printf
- (
- "Error on partition %s: %s%n"
- ,
- partitionId
- ,
- error
- .
- getMessage
- (
- )
- )
- ;
- }
- )
- Resource Cleanup
- // Always close clients
- try
- {
- producer
- .
- send
- (
- batch
- )
- ;
- }
- finally
- {
- producer
- .
- close
- (
- )
- ;
- }
- // Or use try-with-resources
- try
- (
- EventHubProducerClient
- producer
- =
- new
- EventHubClientBuilder
- (
- )
- .
- connectionString
- (
- connectionString
- ,
- eventHubName
- )
- .
- buildProducerClient
- (
- )
- )
- {
- producer
- .
- send
- (
- events
- )
- ;
- }
- Environment Variables
- EVENT_HUBS_CONNECTION_STRING
- =
- Endpoint
- =
- sb://
- <
- namespace
- >
- .servicebus.windows.net/
- ;
- SharedAccessKeyName
- =
- ..
- .
- EVENT_HUBS_NAME
- =
- <
- event-hub-name
- >
- STORAGE_CONNECTION_STRING
- =
- <
- for-checkpointing
- >
- Best Practices
- Use EventProcessorClient
-
- For production, provides load balancing and checkpointing
- Batch Events
-
- Use
- EventDataBatch
- for efficient sending
- Partition Keys
-
- Use for ordering guarantees within a partition
- Checkpointing
-
- Checkpoint after processing to avoid reprocessing
- Error Handling
-
- Handle transient errors with retries
- Close Clients
- Always close producer/consumer when done Trigger Phrases "Event Hubs Java" "event streaming Azure" "real-time data ingestion" "EventProcessorClient" "event hub producer consumer" "partition processing" When to Use This skill is applicable to execute the workflow or actions described in the overview.