azure-eventhub-ts

安装量: 50
排名: #14724

安装

npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill azure-eventhub-ts

Azure Event Hubs SDK for TypeScript High-throughput event streaming and real-time data ingestion. Installation npm install @azure/event-hubs @azure/identity For checkpointing with consumer groups: npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob Environment Variables EVENTHUB_NAMESPACE = < namespace

.servicebus.windows.net EVENTHUB_NAME = my-eventhub STORAGE_ACCOUNT_NAME = < storage-account

STORAGE_CONTAINER_NAME

checkpoints Authentication import { EventHubProducerClient , EventHubConsumerClient } from "@azure/event-hubs" ; import { DefaultAzureCredential } from "@azure/identity" ; const fullyQualifiedNamespace = process . env . EVENTHUB_NAMESPACE ! ; const eventHubName = process . env . EVENTHUB_NAME ! ; const credential = new DefaultAzureCredential ( ) ; // Producer const producer = new EventHubProducerClient ( fullyQualifiedNamespace , eventHubName , credential ) ; // Consumer const consumer = new EventHubConsumerClient ( "$Default" , // Consumer group fullyQualifiedNamespace , eventHubName , credential ) ; Core Workflow Send Events const producer = new EventHubProducerClient ( namespace , eventHubName , credential ) ; // Create batch and add events const batch = await producer . createBatch ( ) ; batch . tryAdd ( { body : { temperature : 72.5 , deviceId : "sensor-1" } } ) ; batch . tryAdd ( { body : { temperature : 68.2 , deviceId : "sensor-2" } } ) ; await producer . sendBatch ( batch ) ; await producer . close ( ) ; Send to Specific Partition // By partition ID const batch = await producer . createBatch ( { partitionId : "0" } ) ; // By partition key (consistent hashing) const batch = await producer . createBatch ( { partitionKey : "device-123" } ) ; Receive Events (Simple) const consumer = new EventHubConsumerClient ( "$Default" , namespace , eventHubName , credential ) ; const subscription = consumer . subscribe ( { processEvents : async ( events , context ) => { for ( const event of events ) { console . log ( Partition: ${ context . partitionId } , Body: ${ JSON . stringify ( event . body ) } ) ; } } , processError : async ( err , context ) => { console . error ( Error on partition ${ context . partitionId } : ${ err . message } ) ; } , } ) ; // Stop after some time setTimeout ( async ( ) => { await subscription . close ( ) ; await consumer . close ( ) ; } , 60000 ) ; Receive with Checkpointing (Production) import { EventHubConsumerClient } from "@azure/event-hubs" ; import { ContainerClient } from "@azure/storage-blob" ; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob" ; const containerClient = new ContainerClient ( https:// ${ storageAccount } .blob.core.windows.net/ ${ containerName } , credential ) ; const checkpointStore = new BlobCheckpointStore ( containerClient ) ; const consumer = new EventHubConsumerClient ( "$Default" , namespace , eventHubName , credential , checkpointStore ) ; const subscription = consumer . subscribe ( { processEvents : async ( events , context ) => { for ( const event of events ) { console . log ( Processing: ${ JSON . stringify ( event . body ) } ) ; } // Checkpoint after processing batch if ( events . length

0 ) { await context . updateCheckpoint ( events [ events . length - 1 ] ) ; } } , processError : async ( err , context ) => { console . error ( Error: ${ err . message } ) ; } , } ) ; Receive from Specific Position const subscription = consumer . subscribe ( { processEvents : async ( events , context ) => { / ... / } , processError : async ( err , context ) => { / ... / } , } , { startPosition : { // Start from beginning "0" : { offset : "@earliest" } , // Start from end (new events only) "1" : { offset : "@latest" } , // Start from specific offset "2" : { offset : "12345" } , // Start from specific time "3" : { enqueuedOn : new Date ( "2024-01-01" ) } , } , } ) ; Event Hub Properties // Get hub info const hubProperties = await producer . getEventHubProperties ( ) ; console . log ( Partitions: ${ hubProperties . partitionIds } ) ; // Get partition info const partitionProperties = await producer . getPartitionProperties ( "0" ) ; console . log ( Last sequence: ${ partitionProperties . lastEnqueuedSequenceNumber } ) ; Batch Processing Options const subscription = consumer . subscribe ( { processEvents : async ( events , context ) => { / ... / } , processError : async ( err , context ) => { / ... / } , } , { maxBatchSize : 100 , // Max events per batch maxWaitTimeInSeconds : 30 , // Max wait for batch } ) ; Key Types import { EventHubProducerClient , EventHubConsumerClient , EventData , ReceivedEventData , PartitionContext , Subscription , SubscriptionEventHandlers , CreateBatchOptions , EventPosition , } from "@azure/event-hubs" ; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob" ; Event Properties // Send with properties const batch = await producer . createBatch ( ) ; batch . tryAdd ( { body : { data : "payload" } , properties : { eventType : "telemetry" , deviceId : "sensor-1" , } , contentType : "application/json" , correlationId : "request-123" , } ) ; // Access in receiver consumer . subscribe ( { processEvents : async ( events , context ) => { for ( const event of events ) { console . log ( Type: ${ event . properties ?. eventType } ) ; console . log ( Sequence: ${ event . sequenceNumber } ) ; console . log ( Enqueued: ${ event . enqueuedTimeUtc } ) ; console . log ( Offset: ${ event . offset } ) ; } } , } ) ; Error Handling consumer . subscribe ( { processEvents : async ( events , context ) => { try { for ( const event of events ) { await processEvent ( event ) ; } await context . updateCheckpoint ( events [ events . length - 1 ] ) ; } catch ( error ) { // Don't checkpoint on error - events will be reprocessed console . error ( "Processing failed:" , error ) ; } } , processError : async ( err , context ) => { if ( err . name === "MessagingError" ) { // Transient error - SDK will retry console . warn ( "Transient error:" , err . message ) ; } else { // Fatal error console . error ( "Fatal error:" , err ) ; } } , } ) ; Best Practices Use checkpointing - Always checkpoint in production for exactly-once processing Batch sends - Use createBatch() for efficient sending Partition keys - Use partition keys to ensure ordering for related events Consumer groups - Use separate consumer groups for different processing pipelines Handle errors gracefully - Don't checkpoint on processing failures Close clients - Always close producer/consumer when done Monitor lag - Track lastEnqueuedSequenceNumber vs processed sequence When to Use This skill is applicable to execute the workflow or actions described in the overview.

返回排行榜