azure-servicebus-py

安装量: 42
排名: #17147

安装

npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill azure-servicebus-py

Azure Service Bus SDK for Python Enterprise messaging for reliable cloud communication with queues and pub/sub topics. Installation pip install azure-servicebus azure-identity Environment Variables SERVICEBUS_FULLY_QUALIFIED_NAMESPACE = < namespace

.servicebus.windows.net SERVICEBUS_QUEUE_NAME = myqueue SERVICEBUS_TOPIC_NAME = mytopic SERVICEBUS_SUBSCRIPTION_NAME = mysubscription Authentication from azure . identity import DefaultAzureCredential from azure . servicebus import ServiceBusClient credential = DefaultAzureCredential ( ) namespace = ".servicebus.windows.net" client = ServiceBusClient ( fully_qualified_namespace = namespace , credential = credential ) Client Types Client Purpose Get From ServiceBusClient Connection management Direct instantiation ServiceBusSender Send messages client.get_queue_sender() / get_topic_sender() ServiceBusReceiver Receive messages client.get_queue_receiver() / get_subscription_receiver() Send Messages (Async) import asyncio from azure . servicebus . aio import ServiceBusClient from azure . servicebus import ServiceBusMessage from azure . identity . aio import DefaultAzureCredential async def send_messages ( ) : credential = DefaultAzureCredential ( ) async with ServiceBusClient ( fully_qualified_namespace = ".servicebus.windows.net" , credential = credential ) as client : sender = client . get_queue_sender ( queue_name = "myqueue" ) async with sender :

Single message

message

ServiceBusMessage ( "Hello, Service Bus!" ) await sender . send_messages ( message )

Batch of messages

messages

[ ServiceBusMessage ( f"Message { i } " ) for i in range ( 10 ) ] await sender . send_messages ( messages )

Message batch (for size control)

batch

await sender . create_message_batch ( ) for i in range ( 100 ) : try : batch . add_message ( ServiceBusMessage ( f"Batch message { i } " ) ) except ValueError :

Batch full

await sender . send_messages ( batch ) batch = await sender . create_message_batch ( ) batch . add_message ( ServiceBusMessage ( f"Batch message { i } " ) ) await sender . send_messages ( batch ) asyncio . run ( send_messages ( ) ) Receive Messages (Async) async def receive_messages ( ) : credential = DefaultAzureCredential ( ) async with ServiceBusClient ( fully_qualified_namespace = ".servicebus.windows.net" , credential = credential ) as client : receiver = client . get_queue_receiver ( queue_name = "myqueue" ) async with receiver :

Receive batch

messages

await receiver . receive_messages ( max_message_count = 10 , max_wait_time = 5

seconds

) for msg in messages : print ( f"Received: { str ( msg ) } " ) await receiver . complete_message ( msg )

Remove from queue

asyncio . run ( receive_messages ( ) ) Receive Modes Mode Behavior Use Case PEEK_LOCK (default) Message locked, must complete/abandon Reliable processing RECEIVE_AND_DELETE Removed immediately on receive At-most-once delivery from azure . servicebus import ServiceBusReceiveMode receiver = client . get_queue_receiver ( queue_name = "myqueue" , receive_mode = ServiceBusReceiveMode . RECEIVE_AND_DELETE ) Message Settlement async with receiver : messages = await receiver . receive_messages ( max_message_count = 1 ) for msg in messages : try :

Process message...

await receiver . complete_message ( msg )

Success - remove from queue

except ProcessingError : await receiver . abandon_message ( msg )

Retry later

except PermanentError : await receiver . dead_letter_message ( msg , reason = "ProcessingFailed" , error_description = "Could not process" ) Action Effect complete_message() Remove from queue (success) abandon_message() Release lock, retry immediately dead_letter_message() Move to dead-letter queue defer_message() Set aside, receive by sequence number Topics and Subscriptions

Send to topic

sender

client . get_topic_sender ( topic_name = "mytopic" ) async with sender : await sender . send_messages ( ServiceBusMessage ( "Topic message" ) )

Receive from subscription

receiver

client . get_subscription_receiver ( topic_name = "mytopic" , subscription_name = "mysubscription" ) async with receiver : messages = await receiver . receive_messages ( max_message_count = 10 ) Sessions (FIFO)

Send with session

message

ServiceBusMessage ( "Session message" ) message . session_id = "order-123" await sender . send_messages ( message )

Receive from specific session

receiver

client . get_queue_receiver ( queue_name = "session-queue" , session_id = "order-123" )

Receive from next available session

from azure . servicebus import NEXT_AVAILABLE_SESSION receiver = client . get_queue_receiver ( queue_name = "session-queue" , session_id = NEXT_AVAILABLE_SESSION ) Scheduled Messages from datetime import datetime , timedelta , timezone message = ServiceBusMessage ( "Scheduled message" ) scheduled_time = datetime . now ( timezone . utc ) + timedelta ( minutes = 10 )

Schedule message

sequence_number

await sender . schedule_messages ( message , scheduled_time )

Cancel scheduled message

await sender . cancel_scheduled_messages ( sequence_number ) Dead-Letter Queue from azure . servicebus import ServiceBusSubQueue

Receive from dead-letter queue

dlq_receiver

client . get_queue_receiver ( queue_name = "myqueue" , sub_queue = ServiceBusSubQueue . DEAD_LETTER ) async with dlq_receiver : messages = await dlq_receiver . receive_messages ( max_message_count = 10 ) for msg in messages : print ( f"Dead-lettered: { msg . dead_letter_reason } " ) await dlq_receiver . complete_message ( msg ) Sync Client (for simple scripts) from azure . servicebus import ServiceBusClient , ServiceBusMessage from azure . identity import DefaultAzureCredential with ServiceBusClient ( fully_qualified_namespace = ".servicebus.windows.net" , credential = DefaultAzureCredential ( ) ) as client : with client . get_queue_sender ( "myqueue" ) as sender : sender . send_messages ( ServiceBusMessage ( "Sync message" ) ) with client . get_queue_receiver ( "myqueue" ) as receiver : for msg in receiver : print ( str ( msg ) ) receiver . complete_message ( msg ) Best Practices Use async client for production workloads Use context managers ( async with ) for proper cleanup Complete messages after successful processing Use dead-letter queue for poison messages Use sessions for ordered, FIFO processing Use message batches for high-throughput scenarios Set max_wait_time to avoid infinite blocking Reference Files File Contents references/patterns.md Competing consumers, sessions, retry patterns, request-response, transactions references/dead-letter.md DLQ handling, poison messages, reprocessing strategies scripts/setup_servicebus.py CLI for queue/topic/subscription management and DLQ monitoring When to Use This skill is applicable to execute the workflow or actions described in the overview.

返回排行榜