ai-engineer-expert

安装量: 65
排名: #11666

安装

npx skills add https://github.com/personamanagmentlayer/pcl --skill ai-engineer-expert

AI Engineer Expert

Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.

Core Concepts AI Engineering LLM integration and orchestration Prompt engineering and optimization RAG (Retrieval-Augmented Generation) Vector databases and embeddings Fine-tuning and adaptation AI agent systems Production AI Model deployment strategies API design for AI services Rate limiting and cost control Error handling and fallbacks Monitoring and logging Security and safety LLM Patterns Chain-of-thought prompting Few-shot learning System/user message design Function calling and tools Streaming responses Context window management LLM Integration from openai import AsyncOpenAI from anthropic import Anthropic from typing import List, Dict, Optional import asyncio

class LLMClient: """Unified LLM client with fallback"""

def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
    self.openai_client = AsyncOpenAI()
    self.anthropic_client = Anthropic()
    self.primary = primary
    self.fallback = fallback

async def chat_completion(self, messages: List[Dict],
                          model: str = "gpt-4-turbo",
                          temperature: float = 0.7,
                          max_tokens: int = 1000) -> str:
    """Chat completion with fallback"""
    try:
        if self.primary == "openai":
            response = await self.openai_client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            return response.choices[0].message.content

    except Exception as e:
        print(f"Primary provider failed: {e}, trying fallback")

        if self.fallback == "anthropic":
            response = self.anthropic_client.messages.create(
                model="claude-3-5-sonnet-20241022",
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            return response.content[0].text

async def chat_completion_streaming(self, messages: List[Dict],
                                   model: str = "gpt-4-turbo"):
    """Streaming chat completion"""
    stream = await self.openai_client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

async def function_calling(self, messages: List[Dict],
                          tools: List[Dict]) -> Dict:
    """Function calling with tools"""
    response = await self.openai_client.chat.completions.create(
        model="gpt-4-turbo",
        messages=messages,
        tools=tools,
        tool_choice="auto"
    )

    message = response.choices[0].message

    if message.tool_calls:
        return {
            "type": "function_call",
            "function": message.tool_calls[0].function.name,
            "arguments": message.tool_calls[0].function.arguments
        }
    else:
        return {
            "type": "message",
            "content": message.content
        }

RAG Implementation from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.chains import RetrievalQA from langchain_openai import ChatOpenAI

class RAGSystem: """Retrieval-Augmented Generation system"""

def __init__(self, persist_directory: str = "./chroma_db"):
    self.embeddings = OpenAIEmbeddings()
    self.vectorstore = None
    self.persist_directory = persist_directory
    self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)

def ingest_documents(self, documents: List[str]):
    """Ingest and index documents"""
    # Split documents into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200
    )
    chunks = text_splitter.create_documents(documents)

    # Create vector store
    self.vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=self.embeddings,
        persist_directory=self.persist_directory
    )

def query(self, question: str, k: int = 4) -> Dict:
    """Query with RAG"""
    if not self.vectorstore:
        raise ValueError("No documents ingested")

    # Retrieve relevant documents
    retriever = self.vectorstore.as_retriever(
        search_kwargs={"k": k}
    )

    # Create QA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=self.llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True
    )

    # Get answer
    result = qa_chain({"query": question})

    return {
        "answer": result["result"],
        "sources": [doc.page_content for doc in result["source_documents"]]
    }

def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
    """Similarity search in vector database"""
    results = self.vectorstore.similarity_search_with_score(query, k=k)

    return [
        {
            "content": doc.page_content,
            "score": score,
            "metadata": doc.metadata
        }
        for doc, score in results
    ]

Prompt Engineering class PromptTemplate: """Advanced prompt templates"""

@staticmethod
def chain_of_thought(question: str) -> str:
    """Chain-of-thought prompting"""
    return f"""Let's solve this step by step:

Question: {question}

Please think through this problem carefully: 1. First, identify what we need to find 2. Then, break down the problem into smaller steps 3. Solve each step 4. Finally, combine the results

Your step-by-step solution:"""

@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
    """Few-shot learning prompt"""
    examples_text = "\n\n".join([
        f"Input: {ex['input']}\nOutput: {ex['output']}"
        for ex in examples
    ])

    return f"""Task: {task}

Here are some examples:

{examples_text}

Now, please solve this:

Input: {query} Output:"""

@staticmethod
def system_message(role: str, constraints: List[str],
                   format_instructions: str) -> str:
    """System message template"""
    constraints_text = "\n".join([f"- {c}" for c in constraints])

    return f"""You are a {role}.

Constraints:

Output Format:

Remember to follow these guidelines strictly."""

AI Agent System from typing import Callable import json

class Tool: """Tool that agents can use"""

def __init__(self, name: str, description: str, function: Callable):
    self.name = name
    self.description = description
    self.function = function

def to_openai_function(self) -> Dict:
    """Convert to OpenAI function format"""
    return {
        "type": "function",
        "function": {
            "name": self.name,
            "description": self.description,
            "parameters": self.get_parameters()
        }
    }

class AIAgent: """AI agent with tools"""

def __init__(self, llm_client: LLMClient, tools: List[Tool]):
    self.llm = llm_client
    self.tools = {tool.name: tool for tool in tools}
    self.conversation_history = []

async def run(self, user_input: str, max_iterations: int = 10) -> str:
    """Run agent with tool use"""
    self.conversation_history.append({
        "role": "user",
        "content": user_input
    })

    for i in range(max_iterations):
        # Get LLM response with function calling
        response = await self.llm.function_calling(
            messages=self.conversation_history,
            tools=[tool.to_openai_function() for tool in self.tools.values()]
        )

        if response["type"] == "message":
            # Agent is done
            return response["content"]

        # Execute tool
        tool_name = response["function"]
        arguments = json.loads(response["arguments"])

        tool_result = await self.execute_tool(tool_name, arguments)

        # Add tool result to conversation
        self.conversation_history.append({
            "role": "function",
            "name": tool_name,
            "content": str(tool_result)
        })

    return "Max iterations reached"

async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
    """Execute a tool"""
    if tool_name not in self.tools:
        raise ValueError(f"Tool {tool_name} not found")

    tool = self.tools[tool_name]
    return await tool.function(**arguments)

Production Deployment from fastapi import FastAPI, HTTPException, Depends from fastapi.responses import StreamingResponse from pydantic import BaseModel from circuitbreaker import circuit import asyncio

app = FastAPI()

class ChatRequest(BaseModel): messages: List[Dict] model: str = "gpt-4-turbo" stream: bool = False

class RateLimiter: """Rate limiter for API"""

def __init__(self, max_requests: int, window_seconds: int):
    self.max_requests = max_requests
    self.window_seconds = window_seconds
    self.requests = {}

async def check_limit(self, user_id: str) -> bool:
    """Check if user is within rate limit"""
    import time
    now = time.time()

    if user_id not in self.requests:
        self.requests[user_id] = []

    # Remove old requests
    self.requests[user_id] = [
        req_time for req_time in self.requests[user_id]
        if now - req_time < self.window_seconds
    ]

    if len(self.requests[user_id]) >= self.max_requests:
        return False

    self.requests[user_id].append(now)
    return True

rate_limiter = RateLimiter(max_requests=100, window_seconds=60) llm_client = LLMClient()

@circuit(failure_threshold=5, recovery_timeout=60) async def call_llm(messages: List[Dict]) -> str: """LLM call with circuit breaker""" return await llm_client.chat_completion(messages)

@app.post("/chat") async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)): """Chat endpoint with rate limiting""" # Check rate limit if not await rate_limiter.check_limit(user_id): raise HTTPException(status_code=429, detail="Rate limit exceeded")

try:
    if request.stream:
        async def generate():
            async for chunk in llm_client.chat_completion_streaming(request.messages):
                yield chunk

        return StreamingResponse(generate(), media_type="text/event-stream")
    else:
        response = await call_llm(request.messages)
        return {"response": response}

except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))

Best Practices LLM Integration Implement fallback providers Use streaming for better UX Cache responses where appropriate Handle rate limits gracefully Monitor token usage and costs Version prompts and track changes Production Systems Implement circuit breakers Add comprehensive logging Monitor latency and errors Use rate limiting Implement retry logic with backoff Test edge cases thoroughly Security Validate and sanitize inputs Implement authentication/authorization Never expose API keys in logs Use environment variables for secrets Implement content filtering Monitor for prompt injection Anti-Patterns

❌ No error handling or fallbacks ❌ Exposing raw LLM outputs without validation ❌ No rate limiting or cost controls ❌ Storing API keys in code ❌ No monitoring or logging ❌ Ignoring token limits ❌ No testing of prompts

Resources OpenAI API: https://platform.openai.com/docs Anthropic Claude: https://docs.anthropic.com/ LangChain: https://python.langchain.com/ LlamaIndex: https://www.llamaindex.ai/ Weights & Biases Prompts: https://wandb.ai/site/prompts

返回排行榜