voice-ai-engine-development

安装量: 153
排名: #5643

安装

npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill voice-ai-engine-development

Voice AI Engine Development Overview

This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.

The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.

When to Use This Skill

Use this skill when:

Building real-time voice conversation systems Implementing voice assistants or chatbots Creating voice-enabled customer service agents Developing voice AI applications with interrupt capabilities Integrating multiple transcription, LLM, or TTS providers Working with streaming audio processing pipelines The user mentions Vocode, voice engines, or conversational AI Core Architecture Principles The Worker Pipeline Pattern

Every voice AI engine follows this pipeline:

Audio In → Transcriber → Agent → Synthesizer → Audio Out (Worker 1) (Worker 2) (Worker 3)

Key Benefits:

Decoupling: Workers only know about their input/output queues Concurrency: All workers run simultaneously via asyncio Backpressure: Queues automatically handle rate differences Interruptibility: Everything can be stopped mid-stream Base Worker Pattern

Every worker follows this pattern:

class BaseWorker: def init(self, input_queue, output_queue): self.input_queue = input_queue # asyncio.Queue to consume from self.output_queue = output_queue # asyncio.Queue to produce to self.active = False

def start(self):
    """Start the worker's processing loop"""
    self.active = True
    asyncio.create_task(self._run_loop())

async def _run_loop(self):
    """Main processing loop - runs forever until terminated"""
    while self.active:
        item = await self.input_queue.get()  # Block until item arrives
        await self.process(item)              # Process the item

async def process(self, item):
    """Override this - does the actual work"""
    raise NotImplementedError

def terminate(self):
    """Stop the worker"""
    self.active = False

Component Implementation Guide 1. Transcriber (Audio → Text)

Purpose: Converts incoming audio chunks to text transcriptions

Interface Requirements:

class BaseTranscriber: def init(self, transcriber_config): self.input_queue = asyncio.Queue() # Audio chunks (bytes) self.output_queue = asyncio.Queue() # Transcriptions self.is_muted = False

def send_audio(self, chunk: bytes):
    """Client calls this to send audio"""
    if not self.is_muted:
        self.input_queue.put_nowait(chunk)
    else:
        # Send silence instead (prevents echo during bot speech)
        self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))

def mute(self):
    """Called when bot starts speaking (prevents echo)"""
    self.is_muted = True

def unmute(self):
    """Called when bot stops speaking"""
    self.is_muted = False

Output Format:

class Transcription: message: str # "Hello, how are you?" confidence: float # 0.95 is_final: bool # True = complete sentence, False = partial is_interrupt: bool # Set by TranscriptionsWorker

Supported Providers:

Deepgram - Fast, accurate, streaming AssemblyAI - High accuracy, good for accents Azure Speech - Enterprise-grade Google Cloud Speech - Multi-language support

Critical Implementation Details:

Use WebSocket for bidirectional streaming Run sender and receiver tasks concurrently with asyncio.gather() Mute transcriber when bot speaks to prevent echo/feedback loops Handle both final and partial transcriptions 2. Agent (Text → Response)

Purpose: Processes user input and generates conversational responses

Interface Requirements:

class BaseAgent: def init(self, agent_config): self.input_queue = asyncio.Queue() # TranscriptionAgentInput self.output_queue = asyncio.Queue() # AgentResponse self.transcript = None # Conversation history

async def generate_response(self, human_input, is_interrupt, conversation_id):
    """Override this - returns AsyncGenerator of responses"""
    raise NotImplementedError

Why Streaming Responses?

Lower latency: Start speaking as soon as first sentence is ready Better interrupts: Can stop mid-response Sentence-by-sentence: More natural conversation flow

Supported Providers:

OpenAI (GPT-4, GPT-3.5) - High quality, fast Google Gemini - Multimodal, cost-effective Anthropic Claude - Long context, nuanced responses

Critical Implementation Details:

Maintain conversation history in Transcript object Stream responses using AsyncGenerator IMPORTANT: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping) Handle interrupts by canceling current generation task Update conversation history with partial messages on interrupt 3. Synthesizer (Text → Audio)

Purpose: Converts agent text responses to speech audio

Interface Requirements:

class BaseSynthesizer: async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult: """ Returns a SynthesisResult containing: - chunk_generator: AsyncGenerator that yields audio chunks - get_message_up_to: Function to get partial text (for interrupts) """ raise NotImplementedError

SynthesisResult Structure:

class SynthesisResult: chunk_generator: AsyncGenerator[ChunkResult, None] get_message_up_to: Callable[[float], str] # seconds → partial text

class ChunkResult:
    chunk: bytes          # Raw PCM audio
    is_last_chunk: bool

Supported Providers:

ElevenLabs - Most natural voices, streaming Azure TTS - Enterprise-grade, many languages Google Cloud TTS - Cost-effective, good quality Amazon Polly - AWS integration Play.ht - Voice cloning

Critical Implementation Details:

Stream audio chunks as they're generated Convert audio to LINEAR16 PCM format (16kHz sample rate) Implement get_message_up_to() for interrupt handling Handle audio format conversion (MP3 → PCM) 4. Output Device (Audio → Client)

Purpose: Sends synthesized audio back to the client

CRITICAL: Rate Limiting for Interrupts

async def send_speech_to_output(self, message, synthesis_result, stop_event, seconds_per_chunk): chunk_idx = 0 async for chunk_result in synthesis_result.chunk_generator: # Check for interrupt if stop_event.is_set(): logger.debug(f"Interrupted after {chunk_idx} chunks") message_sent = synthesis_result.get_message_up_to( chunk_idx * seconds_per_chunk ) return message_sent, True # cut_off = True

    start_time = time.time()

    # Send chunk to output device
    self.output_device.consume_nonblocking(chunk_result.chunk)

    # CRITICAL: Wait for chunk to play before sending next one
    # This is what makes interrupts work!
    speech_length = seconds_per_chunk
    processing_time = time.time() - start_time
    await asyncio.sleep(max(speech_length - processing_time, 0))

    chunk_idx += 1

return message, False  # cut_off = False

Why Rate Limiting? Without rate limiting, all audio chunks would be sent immediately, which would:

Buffer entire message on client side Make interrupts impossible (all audio already sent) Cause timing issues

By sending one chunk every N seconds:

Real-time playback is maintained Interrupts can stop mid-sentence Natural conversation flow is preserved The Interrupt System

The interrupt system is critical for natural conversations.

How Interrupts Work

Scenario: Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".

Step 1: User starts speaking

TranscriptionsWorker detects new transcription while bot speaking

async def process(self, transcription): if not self.conversation.is_human_speaking: # Bot was speaking! # Broadcast interrupt to all in-flight events interrupted = self.conversation.broadcast_interrupt() transcription.is_interrupt = interrupted

Step 2: broadcast_interrupt() stops everything

def broadcast_interrupt(self): num_interrupts = 0 # Interrupt all queued events while True: try: interruptible_event = self.interruptible_events.get_nowait() if interruptible_event.interrupt(): # Sets interruption_event num_interrupts += 1 except queue.Empty: break

# Cancel current tasks
self.agent.cancel_current_task()              # Stop generating text
self.agent_responses_worker.cancel_current_task()  # Stop synthesizing
return num_interrupts > 0

Step 3: SynthesisResultsWorker detects interrupt

async def send_speech_to_output(self, synthesis_result, stop_event, ...): async for chunk_result in synthesis_result.chunk_generator: # Check stop_event (this is the interruption_event) if stop_event.is_set(): logger.debug("Interrupted! Stopping speech.") # Calculate what was actually spoken seconds_spoken = chunk_idx * seconds_per_chunk partial_message = synthesis_result.get_message_up_to(seconds_spoken) # e.g., "I think the weather will be nice today" return partial_message, True # cut_off = True

Step 4: Agent updates history

if cut_off: # Update conversation history with partial message self.agent.update_last_bot_message_on_cut_off(message_sent) # History now shows: # Bot: "I think the weather will be nice today" (incomplete)

InterruptibleEvent Pattern

Every event in the pipeline is wrapped in an InterruptibleEvent:

class InterruptibleEvent: def init(self, payload, is_interruptible=True): self.payload = payload self.is_interruptible = is_interruptible self.interruption_event = threading.Event() # Initially not set self.interrupted = False

def interrupt(self) -> bool:
    """Interrupt this event"""
    if not self.is_interruptible:
        return False
    if not self.interrupted:
        self.interruption_event.set()  # Signal to stop!
        self.interrupted = True
        return True
    return False

def is_interrupted(self) -> bool:
    return self.interruption_event.is_set()

Multi-Provider Factory Pattern

Support multiple providers with a factory pattern:

class VoiceHandler: """Multi-provider factory for voice components"""

def create_transcriber(self, agent_config: Dict):
    """Create transcriber based on transcriberProvider"""
    provider = agent_config.get("transcriberProvider", "deepgram")

    if provider == "deepgram":
        return self._create_deepgram_transcriber(agent_config)
    elif provider == "assemblyai":
        return self._create_assemblyai_transcriber(agent_config)
    elif provider == "azure":
        return self._create_azure_transcriber(agent_config)
    elif provider == "google":
        return self._create_google_transcriber(agent_config)
    else:
        raise ValueError(f"Unknown transcriber provider: {provider}")

def create_agent(self, agent_config: Dict):
    """Create LLM agent based on llmProvider"""
    provider = agent_config.get("llmProvider", "openai")

    if provider == "openai":
        return self._create_openai_agent(agent_config)
    elif provider == "gemini":
        return self._create_gemini_agent(agent_config)
    else:
        raise ValueError(f"Unknown LLM provider: {provider}")

def create_synthesizer(self, agent_config: Dict):
    """Create voice synthesizer based on voiceProvider"""
    provider = agent_config.get("voiceProvider", "elevenlabs")

    if provider == "elevenlabs":
        return self._create_elevenlabs_synthesizer(agent_config)
    elif provider == "azure":
        return self._create_azure_synthesizer(agent_config)
    elif provider == "google":
        return self._create_google_synthesizer(agent_config)
    elif provider == "polly":
        return self._create_polly_synthesizer(agent_config)
    elif provider == "playht":
        return self._create_playht_synthesizer(agent_config)
    else:
        raise ValueError(f"Unknown voice provider: {provider}")

WebSocket Integration

Voice AI engines typically use WebSocket for bidirectional audio streaming:

@app.websocket("/conversation") async def websocket_endpoint(websocket: WebSocket): await websocket.accept()

# Create voice components
voice_handler = VoiceHandler()
transcriber = voice_handler.create_transcriber(agent_config)
agent = voice_handler.create_agent(agent_config)
synthesizer = voice_handler.create_synthesizer(agent_config)

# Create output device
output_device = WebsocketOutputDevice(
    ws=websocket,
    sampling_rate=16000,
    audio_encoding=AudioEncoding.LINEAR16
)

# Create conversation orchestrator
conversation = StreamingConversation(
    output_device=output_device,
    transcriber=transcriber,
    agent=agent,
    synthesizer=synthesizer
)

# Start all workers
await conversation.start()

try:
    # Receive audio from client
    async for message in websocket.iter_bytes():
        conversation.receive_audio(message)
except WebSocketDisconnect:
    logger.info("Client disconnected")
finally:
    await conversation.terminate()

Common Pitfalls and Solutions 1. Audio Jumping/Cutting Off

Problem: Bot's audio jumps or cuts off mid-response.

Cause: Sending text to synthesizer in small chunks causes multiple TTS calls.

Solution: Buffer the entire LLM response before sending to synthesizer:

❌ Bad: Yields sentence-by-sentence

async for sentence in llm_stream: yield GeneratedResponse(message=BaseMessage(text=sentence))

✅ Good: Buffer entire response

full_response = "" async for chunk in llm_stream: full_response += chunk yield GeneratedResponse(message=BaseMessage(text=full_response))

  1. Echo/Feedback Loop

Problem: Bot hears itself speaking and responds to its own audio.

Cause: Transcriber not muted during bot speech.

Solution: Mute transcriber when bot starts speaking:

Before sending audio to output

self.transcriber.mute()

After audio playback complete

self.transcriber.unmute()

  1. Interrupts Not Working

Problem: User can't interrupt bot mid-sentence.

Cause: All audio chunks sent at once instead of rate-limited.

Solution: Rate-limit audio chunks to match real-time playback:

async for chunk in synthesis_result.chunk_generator: start_time = time.time()

# Send chunk
output_device.consume_nonblocking(chunk)

# Wait for chunk duration before sending next
processing_time = time.time() - start_time
await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
  1. Memory Leaks from Unclosed Streams

Problem: Memory usage grows over time.

Cause: WebSocket connections or API streams not properly closed.

Solution: Always use context managers and cleanup:

try: async with websockets.connect(url) as ws: # Use websocket pass finally: # Cleanup await conversation.terminate() await transcriber.terminate()

Production Considerations 1. Error Handling async def _run_loop(self): while self.active: try: item = await self.input_queue.get() await self.process(item) except Exception as e: logger.error(f"Worker error: {e}", exc_info=True) # Don't crash the worker, continue processing

  1. Graceful Shutdown async def terminate(self): """Gracefully shut down all workers""" self.active = False

    Stop all workers

    self.transcriber.terminate() self.agent.terminate() self.synthesizer.terminate()

    Wait for queues to drain

    await asyncio.sleep(0.5)

    Close connections

    if self.websocket: await self.websocket.close()

  2. Monitoring and Logging

Log key events

logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'") logger.info(f"🤖 [AGENT] Generating response...") logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters") logger.info(f"⚠️ [INTERRUPT] User interrupted bot")

Track metrics

metrics.increment("transcriptions.count") metrics.timing("agent.response_time", duration) metrics.gauge("active_conversations", count)

  1. Rate Limiting and Quotas

Implement rate limiting for API calls

from aiolimiter import AsyncLimiter

rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second

async def call_api(self, data): async with rate_limiter: return await self.client.post(data)

Key Design Patterns 1. Producer-Consumer with Queues

Producer

async def producer(queue): while True: item = await generate_item() queue.put_nowait(item)

Consumer

async def consumer(queue): while True: item = await queue.get() await process_item(item)

  1. Streaming Generators

Instead of returning complete results:

❌ Bad: Wait for entire response

async def generate_response(prompt): response = await openai.complete(prompt) # 5 seconds return response

✅ Good: Stream chunks as they arrive

async def generate_response(prompt): async for chunk in openai.complete(prompt, stream=True): yield chunk # Yield after 0.1s, 0.2s, etc.

  1. Conversation State Management

Maintain conversation history for context:

class Transcript: event_logs: List[Message] = []

def add_human_message(self, text):
    self.event_logs.append(Message(sender=Sender.HUMAN, text=text))

def add_bot_message(self, text):
    self.event_logs.append(Message(sender=Sender.BOT, text=text))

def to_openai_messages(self):
    return [
        {"role": "user" if msg.sender == Sender.HUMAN else "assistant",
         "content": msg.text}
        for msg in self.event_logs
    ]

Testing Strategies 1. Unit Test Workers in Isolation async def test_transcriber(): transcriber = DeepgramTranscriber(config)

# Mock audio input
audio_chunk = b'\x00\x01\x02...'
transcriber.send_audio(audio_chunk)

# Check output
transcription = await transcriber.output_queue.get()
assert transcription.message == "expected text"
  1. Integration Test Pipeline async def test_full_pipeline(): # Create all components conversation = create_test_conversation()

    Send test audio

    conversation.receive_audio(test_audio_chunk)

    Wait for response

    response = await wait_for_audio_output(timeout=5)

    assert response is not None

  2. Test Interrupts async def test_interrupt(): conversation = create_test_conversation()

    Start bot speaking

    await conversation.agent.generate_response("Tell me a long story")

    Interrupt mid-response

    await asyncio.sleep(1) # Let it speak for 1 second conversation.broadcast_interrupt()

    Verify partial message in transcript

    last_message = conversation.transcript.event_logs[-1] assert last_message.text != full_expected_message

Implementation Workflow

When implementing a voice AI engine:

Start with Base Workers: Implement the base worker pattern first Add Transcriber: Choose a provider and implement streaming transcription Add Agent: Implement LLM integration with streaming responses Add Synthesizer: Implement TTS with audio streaming Connect Pipeline: Wire all workers together with queues Add Interrupts: Implement the interrupt system Add WebSocket: Create WebSocket endpoint for client communication Test Components: Unit test each worker in isolation Test Integration: Test the full pipeline end-to-end Add Error Handling: Implement robust error handling and logging Optimize: Add rate limiting, monitoring, and performance optimizations Related Skills @websocket-patterns - For WebSocket implementation details @async-python - For asyncio and async patterns @streaming-apis - For streaming API integration @audio-processing - For audio format conversion and processing @systematic-debugging - For debugging complex async pipelines Resources

Libraries:

asyncio - Async programming websockets - WebSocket client/server FastAPI - WebSocket server framework pydub - Audio manipulation numpy - Audio data processing

API Providers:

Transcription: Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech LLM: OpenAI, Google Gemini, Anthropic Claude TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht Summary

Building a voice AI engine requires:

✅ Async worker pipeline for concurrent processing ✅ Queue-based communication between components ✅ Streaming at every stage (transcription, LLM, synthesis) ✅ Interrupt system for natural conversations ✅ Rate limiting for real-time audio playback ✅ Multi-provider support for flexibility ✅ Proper error handling and graceful shutdown

The key insight: Everything must stream and everything must be interruptible for natural, real-time conversations.

返回排行榜