azure-storage-queue-py

安装量: 35
排名: #19804

安装

npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill azure-storage-queue-py

Azure Queue Storage SDK for Python Simple, cost-effective message queuing for asynchronous communication. Installation pip install azure-storage-queue azure-identity Environment Variables AZURE_STORAGE_ACCOUNT_URL = https:// < account

.queue.core.windows.net Authentication from azure . identity import DefaultAzureCredential from azure . storage . queue import QueueServiceClient , QueueClient credential = DefaultAzureCredential ( ) account_url = "https://.queue.core.windows.net"

Service client

service_client

QueueServiceClient ( account_url = account_url , credential = credential )

Queue client

queue_client

QueueClient ( account_url = account_url , queue_name = "myqueue" , credential = credential ) Queue Operations

Create queue

service_client . create_queue ( "myqueue" )

Get queue client

queue_client

service_client . get_queue_client ( "myqueue" )

Delete queue

service_client . delete_queue ( "myqueue" )

List queues

for queue in service_client . list_queues ( ) : print ( queue . name ) Send Messages

Send message (string)

queue_client . send_message ( "Hello, Queue!" )

Send with options

queue_client . send_message ( content = "Delayed message" , visibility_timeout = 60 ,

Hidden for 60 seconds

time_to_live

3600

Expires in 1 hour

)

Send JSON

import json data = { "task" : "process" , "id" : 123 } queue_client . send_message ( json . dumps ( data ) ) Receive Messages

Receive messages (makes them invisible temporarily)

messages

queue_client . receive_messages ( messages_per_page = 10 , visibility_timeout = 30

30 seconds to process

) for message in messages : print ( f"ID: { message . id } " ) print ( f"Content: { message . content } " ) print ( f"Dequeue count: { message . dequeue_count } " )

Process message...

Delete after processing

queue_client . delete_message ( message ) Peek Messages

Peek without hiding (doesn't affect visibility)

messages

queue_client . peek_messages ( max_messages = 5 ) for message in messages : print ( message . content ) Update Message

Extend visibility or update content

messages

queue_client . receive_messages ( ) for message in messages :

Extend timeout (need more time)

queue_client . update_message ( message , visibility_timeout = 60 )

Update content and timeout

queue_client . update_message ( message , content = "Updated content" , visibility_timeout = 60 ) Delete Message

Delete after successful processing

messages

queue_client . receive_messages ( ) for message in messages : try :

Process...

queue_client . delete_message ( message ) except Exception :

Message becomes visible again after timeout

pass Clear Queue

Delete all messages

queue_client . clear_messages ( ) Queue Properties

Get queue properties

properties

queue_client . get_queue_properties ( ) print ( f"Approximate message count: { properties . approximate_message_count } " )

Set/get metadata

queue_client . set_queue_metadata ( metadata = { "environment" : "production" } ) properties = queue_client . get_queue_properties ( ) print ( properties . metadata ) Async Client from azure . storage . queue . aio import QueueServiceClient , QueueClient from azure . identity . aio import DefaultAzureCredential async def queue_operations ( ) : credential = DefaultAzureCredential ( ) async with QueueClient ( account_url = "https://.queue.core.windows.net" , queue_name = "myqueue" , credential = credential ) as client :

Send

await client . send_message ( "Async message" )

Receive

async for message in client . receive_messages ( ) : print ( message . content ) await client . delete_message ( message ) import asyncio asyncio . run ( queue_operations ( ) ) Base64 Encoding from azure . storage . queue import QueueClient , BinaryBase64EncodePolicy , BinaryBase64DecodePolicy

For binary data

queue_client

QueueClient ( account_url = account_url , queue_name = "myqueue" , credential = credential , message_encode_policy = BinaryBase64EncodePolicy ( ) , message_decode_policy = BinaryBase64DecodePolicy ( ) )

Send bytes

queue_client . send_message ( b"Binary content" ) Best Practices Delete messages after processing to prevent reprocessing Set appropriate visibility timeout based on processing time Handle dequeue_count for poison message detection Use async client for high-throughput scenarios Use peek_messages for monitoring without affecting queue Set time_to_live to prevent stale messages Consider Service Bus for advanced features (sessions, topics) When to Use This skill is applicable to execute the workflow or actions described in the overview.

返回排行榜