Azure Event Hubs SDK for Python Big data streaming platform for high-throughput event ingestion. Installation pip install azure-eventhub azure-identity
For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio Environment Variables EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = < namespace
.servicebus.windows.net EVENT_HUB_NAME = my-eventhub STORAGE_ACCOUNT_URL = https:// < account
.blob.core.windows.net CHECKPOINT_CONTAINER = checkpoints Authentication from azure . identity import DefaultAzureCredential from azure . eventhub import EventHubProducerClient , EventHubConsumerClient credential = DefaultAzureCredential ( ) namespace = "
.servicebus.windows.net" eventhub_name = "my-eventhub"
Producer
producer
EventHubProducerClient ( fully_qualified_namespace = namespace , eventhub_name = eventhub_name , credential = credential )
Consumer
consumer
EventHubConsumerClient
(
fully_qualified_namespace
=
namespace
,
eventhub_name
=
eventhub_name
,
consumer_group
=
"$Default"
,
credential
=
credential
)
Client Types
Client
Purpose
EventHubProducerClient
Send events to Event Hub
EventHubConsumerClient
Receive events from Event Hub
BlobCheckpointStore
Track consumer progress
Send Events
from
azure
.
eventhub
import
EventHubProducerClient
,
EventData
from
azure
.
identity
import
DefaultAzureCredential
producer
=
EventHubProducerClient
(
fully_qualified_namespace
=
"
Create batch (handles size limits)
event_data_batch
producer . create_batch ( ) for i in range ( 10 ) : try : event_data_batch . add ( EventData ( f"Event { i } " ) ) except ValueError :
Batch is full, send and create new one
producer . send_batch ( event_data_batch ) event_data_batch = producer . create_batch ( ) event_data_batch . add ( EventData ( f"Event { i } " ) )
Send remaining
producer . send_batch ( event_data_batch ) Send to Specific Partition
By partition ID
event_data_batch
producer . create_batch ( partition_id = "0" )
By partition key (consistent hashing)
event_data_batch
producer
.
create_batch
(
partition_key
=
"user-123"
)
Receive Events
Simple Receive
from
azure
.
eventhub
import
EventHubConsumerClient
def
on_event
(
partition_context
,
event
)
:
print
(
f"Partition:
{
partition_context
.
partition_id
}
"
)
print
(
f"Data:
{
event
.
body_as_str
(
)
}
"
)
partition_context
.
update_checkpoint
(
event
)
consumer
=
EventHubConsumerClient
(
fully_qualified_namespace
=
"
Beginning of stream
)
With Blob Checkpoint Store (Production)
from
azure
.
eventhub
import
EventHubConsumerClient
from
azure
.
eventhub
.
extensions
.
checkpointstoreblob
import
BlobCheckpointStore
from
azure
.
identity
import
DefaultAzureCredential
checkpoint_store
=
BlobCheckpointStore
(
blob_account_url
=
"https://
Checkpoint after processing
partition_context
.
update_checkpoint
(
event
)
with
consumer
:
consumer
.
receive
(
on_event
=
on_event
)
Async Client
from
azure
.
eventhub
.
aio
import
EventHubProducerClient
,
EventHubConsumerClient
from
azure
.
identity
.
aio
import
DefaultAzureCredential
import
asyncio
async
def
send_events
(
)
:
credential
=
DefaultAzureCredential
(
)
async
with
EventHubProducerClient
(
fully_qualified_namespace
=
"
Set properties
event . properties = { "custom_property" : "value" } event . content_type = "application/json"
Read properties (on receive)
print ( event . body_as_str ( ) ) print ( event . sequence_number ) print ( event . offset ) print ( event . enqueued_time ) print ( event . partition_key ) Get Event Hub Info with producer : info = producer . get_eventhub_properties ( ) print ( f"Name: { info [ 'name' ] } " ) print ( f"Partitions: { info [ 'partition_ids' ] } " ) for partition_id in info [ 'partition_ids' ] : partition_info = producer . get_partition_properties ( partition_id ) print ( f"Partition { partition_id } : { partition_info [ 'last_enqueued_sequence_number' ] } " ) Best Practices Use batches for sending multiple events Use checkpoint store in production for reliable processing Use async client for high-throughput scenarios Use partition keys for ordered delivery within a partition Handle batch size limits — catch ValueError when batch is full Use context managers ( with / async with ) for proper cleanup Set appropriate consumer groups for different applications Reference Files File Contents references/checkpointing.md Checkpoint store patterns, blob checkpointing, checkpoint strategies references/partitions.md Partition management, load balancing, starting positions scripts/setup_consumer.py CLI for Event Hub info, consumer setup, and event sending/receiving When to Use This skill is applicable to execute the workflow or actions described in the overview.