azure-eventhub-java

安装量: 54
排名: #13706

安装

npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill azure-eventhub-java

Azure Event Hubs SDK for Java Build real-time streaming applications using the Azure Event Hubs SDK for Java. Installation < dependency

< groupId

com.azure </ groupId

< artifactId

azure-messaging-eventhubs </ artifactId

< version

5.19.0 </ version

</ dependency

<
dependency
>
<
groupId
>
com.azure
</
groupId
>
<
artifactId
>
azure-messaging-eventhubs-checkpointstore-blob
</
artifactId
>
<
version
>
1.20.0
</
version
>
</
dependency
>
Client Creation
EventHubProducerClient
import
com
.
azure
.
messaging
.
eventhubs
.
EventHubProducerClient
;
import
com
.
azure
.
messaging
.
eventhubs
.
EventHubClientBuilder
;
// With connection string
EventHubProducerClient
producer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
buildProducerClient
(
)
;
// Full connection string with EntityPath
EventHubProducerClient
producer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
""
)
.
buildProducerClient
(
)
;
With DefaultAzureCredential
import
com
.
azure
.
identity
.
DefaultAzureCredentialBuilder
;
EventHubProducerClient
producer
=
new
EventHubClientBuilder
(
)
.
fullyQualifiedNamespace
(
".servicebus.windows.net"
)
.
eventHubName
(
""
)
.
credential
(
new
DefaultAzureCredentialBuilder
(
)
.
build
(
)
)
.
buildProducerClient
(
)
;
EventHubConsumerClient
import
com
.
azure
.
messaging
.
eventhubs
.
EventHubConsumerClient
;
EventHubConsumerClient
consumer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
consumerGroup
(
EventHubClientBuilder
.
DEFAULT_CONSUMER_GROUP_NAME
)
.
buildConsumerClient
(
)
;
Async Clients
import
com
.
azure
.
messaging
.
eventhubs
.
EventHubProducerAsyncClient
;
import
com
.
azure
.
messaging
.
eventhubs
.
EventHubConsumerAsyncClient
;
EventHubProducerAsyncClient
asyncProducer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
buildAsyncProducerClient
(
)
;
EventHubConsumerAsyncClient
asyncConsumer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
consumerGroup
(
"$Default"
)
.
buildAsyncConsumerClient
(
)
;
Core Patterns
Send Single Event
import
com
.
azure
.
messaging
.
eventhubs
.
EventData
;
EventData
eventData
=
new
EventData
(
"Hello, Event Hubs!"
)
;
producer
.
send
(
Collections
.
singletonList
(
eventData
)
)
;
Send Event Batch
import
com
.
azure
.
messaging
.
eventhubs
.
EventDataBatch
;
import
com
.
azure
.
messaging
.
eventhubs
.
models
.
CreateBatchOptions
;
// Create batch
EventDataBatch
batch
=
producer
.
createBatch
(
)
;
// Add events (returns false if batch is full)
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
EventData
event
=
new
EventData
(
"Event "
+
i
)
;
if
(
!
batch
.
tryAdd
(
event
)
)
{
// Batch is full, send and create new batch
producer
.
send
(
batch
)
;
batch
=
producer
.
createBatch
(
)
;
batch
.
tryAdd
(
event
)
;
}
}
// Send remaining events
if
(
batch
.
getCount
(
)
>
0
)
{
producer
.
send
(
batch
)
;
}
Send to Specific Partition
CreateBatchOptions
options
=
new
CreateBatchOptions
(
)
.
setPartitionId
(
"0"
)
;
EventDataBatch
batch
=
producer
.
createBatch
(
options
)
;
batch
.
tryAdd
(
new
EventData
(
"Partition 0 event"
)
)
;
producer
.
send
(
batch
)
;
Send with Partition Key
CreateBatchOptions
options
=
new
CreateBatchOptions
(
)
.
setPartitionKey
(
"customer-123"
)
;
EventDataBatch
batch
=
producer
.
createBatch
(
options
)
;
batch
.
tryAdd
(
new
EventData
(
"Customer event"
)
)
;
producer
.
send
(
batch
)
;
Event with Properties
EventData
event
=
new
EventData
(
"Order created"
)
;
event
.
getProperties
(
)
.
put
(
"orderId"
,
"ORD-123"
)
;
event
.
getProperties
(
)
.
put
(
"customerId"
,
"CUST-456"
)
;
event
.
getProperties
(
)
.
put
(
"priority"
,
1
)
;
producer
.
send
(
Collections
.
singletonList
(
event
)
)
;
Receive Events (Simple)
import
com
.
azure
.
messaging
.
eventhubs
.
models
.
EventPosition
;
import
com
.
azure
.
messaging
.
eventhubs
.
models
.
PartitionEvent
;
// Receive from specific partition
Iterable
<
PartitionEvent
>
events
=
consumer
.
receiveFromPartition
(
"0"
,
// partitionId
10
,
// maxEvents
EventPosition
.
earliest
(
)
,
// startingPosition
Duration
.
ofSeconds
(
30
)
// timeout
)
;
for
(
PartitionEvent
partitionEvent
:
events
)
{
EventData
event
=
partitionEvent
.
getData
(
)
;
System
.
out
.
println
(
"Body: "
+
event
.
getBodyAsString
(
)
)
;
System
.
out
.
println
(
"Sequence: "
+
event
.
getSequenceNumber
(
)
)
;
System
.
out
.
println
(
"Offset: "
+
event
.
getOffset
(
)
)
;
}
EventProcessorClient (Production)
import
com
.
azure
.
messaging
.
eventhubs
.
EventProcessorClient
;
import
com
.
azure
.
messaging
.
eventhubs
.
EventProcessorClientBuilder
;
import
com
.
azure
.
messaging
.
eventhubs
.
checkpointstore
.
blob
.
BlobCheckpointStore
;
import
com
.
azure
.
storage
.
blob
.
BlobContainerAsyncClient
;
import
com
.
azure
.
storage
.
blob
.
BlobContainerClientBuilder
;
// Create checkpoint store
BlobContainerAsyncClient
blobClient
=
new
BlobContainerClientBuilder
(
)
.
connectionString
(
""
)
.
containerName
(
"checkpoints"
)
.
buildAsyncClient
(
)
;
// Create processor
EventProcessorClient
processor
=
new
EventProcessorClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
consumerGroup
(
"$Default"
)
.
checkpointStore
(
new
BlobCheckpointStore
(
blobClient
)
)
.
processEvent
(
eventContext
->
{
EventData
event
=
eventContext
.
getEventData
(
)
;
System
.
out
.
println
(
"Processing: "
+
event
.
getBodyAsString
(
)
)
;
// Checkpoint after processing
eventContext
.
updateCheckpoint
(
)
;
}
)
.
processError
(
errorContext
->
{
System
.
err
.
println
(
"Error: "
+
errorContext
.
getThrowable
(
)
.
getMessage
(
)
)
;
System
.
err
.
println
(
"Partition: "
+
errorContext
.
getPartitionContext
(
)
.
getPartitionId
(
)
)
;
}
)
.
buildEventProcessorClient
(
)
;
// Start processing
processor
.
start
(
)
;
// Keep running...
Thread
.
sleep
(
Duration
.
ofMinutes
(
5
)
.
toMillis
(
)
)
;
// Stop gracefully
processor
.
stop
(
)
;
Batch Processing
EventProcessorClient
processor
=
new
EventProcessorClientBuilder
(
)
.
connectionString
(
""
,
""
)
.
consumerGroup
(
"$Default"
)
.
checkpointStore
(
new
BlobCheckpointStore
(
blobClient
)
)
.
processEventBatch
(
eventBatchContext
->
{
List
<
EventData
>
events
=
eventBatchContext
.
getEvents
(
)
;
System
.
out
.
printf
(
"Received %d events%n"
,
events
.
size
(
)
)
;
for
(
EventData
event
:
events
)
{
// Process each event
System
.
out
.
println
(
event
.
getBodyAsString
(
)
)
;
}
// Checkpoint after batch
eventBatchContext
.
updateCheckpoint
(
)
;
}
,
50
)
// maxBatchSize
.
processError
(
errorContext
->
{
System
.
err
.
println
(
"Error: "
+
errorContext
.
getThrowable
(
)
)
;
}
)
.
buildEventProcessorClient
(
)
;
Async Receiving
asyncConsumer
.
receiveFromPartition
(
"0"
,
EventPosition
.
latest
(
)
)
.
subscribe
(
partitionEvent
->
{
EventData
event
=
partitionEvent
.
getData
(
)
;
System
.
out
.
println
(
"Received: "
+
event
.
getBodyAsString
(
)
)
;
}
,
error
->
System
.
err
.
println
(
"Error: "
+
error
)
,
(
)
->
System
.
out
.
println
(
"Complete"
)
)
;
Get Event Hub Properties
// Get hub info
EventHubProperties
hubProps
=
producer
.
getEventHubProperties
(
)
;
System
.
out
.
println
(
"Hub: "
+
hubProps
.
getName
(
)
)
;
System
.
out
.
println
(
"Partitions: "
+
hubProps
.
getPartitionIds
(
)
)
;
// Get partition info
PartitionProperties
partitionProps
=
producer
.
getPartitionProperties
(
"0"
)
;
System
.
out
.
println
(
"Begin sequence: "
+
partitionProps
.
getBeginningSequenceNumber
(
)
)
;
System
.
out
.
println
(
"Last sequence: "
+
partitionProps
.
getLastEnqueuedSequenceNumber
(
)
)
;
System
.
out
.
println
(
"Last offset: "
+
partitionProps
.
getLastEnqueuedOffset
(
)
)
;
Event Positions
// Start from beginning
EventPosition
.
earliest
(
)
// Start from end (new events only)
EventPosition
.
latest
(
)
// From specific offset
EventPosition
.
fromOffset
(
12345L
)
// From specific sequence number
EventPosition
.
fromSequenceNumber
(
100L
)
// From specific time
EventPosition
.
fromEnqueuedTime
(
Instant
.
now
(
)
.
minus
(
Duration
.
ofHours
(
1
)
)
)
Error Handling
import
com
.
azure
.
messaging
.
eventhubs
.
models
.
ErrorContext
;
.
processError
(
errorContext
->
{
Throwable
error
=
errorContext
.
getThrowable
(
)
;
String
partitionId
=
errorContext
.
getPartitionContext
(
)
.
getPartitionId
(
)
;
if
(
error
instanceof
AmqpException
)
{
AmqpException
amqpError
=
(
AmqpException
)
error
;
if
(
amqpError
.
isTransient
(
)
)
{
System
.
out
.
println
(
"Transient error, will retry"
)
;
}
}
System
.
err
.
printf
(
"Error on partition %s: %s%n"
,
partitionId
,
error
.
getMessage
(
)
)
;
}
)
Resource Cleanup
// Always close clients
try
{
producer
.
send
(
batch
)
;
}
finally
{
producer
.
close
(
)
;
}
// Or use try-with-resources
try
(
EventHubProducerClient
producer
=
new
EventHubClientBuilder
(
)
.
connectionString
(
connectionString
,
eventHubName
)
.
buildProducerClient
(
)
)
{
producer
.
send
(
events
)
;
}
Environment Variables
EVENT_HUBS_CONNECTION_STRING
=
Endpoint
=
sb://
<
namespace
>
.servicebus.windows.net/
;
SharedAccessKeyName
=
..
.
EVENT_HUBS_NAME
=
<
event-hub-name
>
STORAGE_CONNECTION_STRING
=
<
for-checkpointing
>
Best Practices
Use EventProcessorClient
For production, provides load balancing and checkpointing
Batch Events
Use
EventDataBatch
for efficient sending
Partition Keys
Use for ordering guarantees within a partition
Checkpointing
Checkpoint after processing to avoid reprocessing
Error Handling
Handle transient errors with retries
Close Clients
Always close producer/consumer when done Trigger Phrases "Event Hubs Java" "event streaming Azure" "real-time data ingestion" "EventProcessorClient" "event hub producer consumer" "partition processing" When to Use This skill is applicable to execute the workflow or actions described in the overview.
返回排行榜