ai-engineer-expert

Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "ai-engineer-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-ai-engineer-expert

AI Engineer Expert

Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.

Core Concepts

AI Engineering

  • LLM integration and orchestration

  • Prompt engineering and optimization

  • RAG (Retrieval-Augmented Generation)

  • Vector databases and embeddings

  • Fine-tuning and adaptation

  • AI agent systems

Production AI

  • Model deployment strategies

  • API design for AI services

  • Rate limiting and cost control

  • Error handling and fallbacks

  • Monitoring and logging

  • Security and safety

LLM Patterns

  • Chain-of-thought prompting

  • Few-shot learning

  • System/user message design

  • Function calling and tools

  • Streaming responses

  • Context window management

LLM Integration

from openai import AsyncOpenAI from anthropic import Anthropic from typing import List, Dict, Optional import asyncio

class LLMClient: """Unified LLM client with fallback"""

def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
    self.openai_client = AsyncOpenAI()
    self.anthropic_client = Anthropic()
    self.primary = primary
    self.fallback = fallback

async def chat_completion(self, messages: List[Dict],
                          model: str = "gpt-4-turbo",
                          temperature: float = 0.7,
                          max_tokens: int = 1000) -> str:
    """Chat completion with fallback"""
    try:
        if self.primary == "openai":
            response = await self.openai_client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            return response.choices[0].message.content

    except Exception as e:
        print(f"Primary provider failed: {e}, trying fallback")

        if self.fallback == "anthropic":
            response = self.anthropic_client.messages.create(
                model="claude-3-5-sonnet-20241022",
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            return response.content[0].text

async def chat_completion_streaming(self, messages: List[Dict],
                                   model: str = "gpt-4-turbo"):
    """Streaming chat completion"""
    stream = await self.openai_client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

async def function_calling(self, messages: List[Dict],
                          tools: List[Dict]) -> Dict:
    """Function calling with tools"""
    response = await self.openai_client.chat.completions.create(
        model="gpt-4-turbo",
        messages=messages,
        tools=tools,
        tool_choice="auto"
    )

    message = response.choices[0].message

    if message.tool_calls:
        return {
            "type": "function_call",
            "function": message.tool_calls[0].function.name,
            "arguments": message.tool_calls[0].function.arguments
        }
    else:
        return {
            "type": "message",
            "content": message.content
        }

RAG Implementation

from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.chains import RetrievalQA from langchain_openai import ChatOpenAI

class RAGSystem: """Retrieval-Augmented Generation system"""

def __init__(self, persist_directory: str = "./chroma_db"):
    self.embeddings = OpenAIEmbeddings()
    self.vectorstore = None
    self.persist_directory = persist_directory
    self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)

def ingest_documents(self, documents: List[str]):
    """Ingest and index documents"""
    # Split documents into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200
    )
    chunks = text_splitter.create_documents(documents)

    # Create vector store
    self.vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=self.embeddings,
        persist_directory=self.persist_directory
    )

def query(self, question: str, k: int = 4) -> Dict:
    """Query with RAG"""
    if not self.vectorstore:
        raise ValueError("No documents ingested")

    # Retrieve relevant documents
    retriever = self.vectorstore.as_retriever(
        search_kwargs={"k": k}
    )

    # Create QA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=self.llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True
    )

    # Get answer
    result = qa_chain({"query": question})

    return {
        "answer": result["result"],
        "sources": [doc.page_content for doc in result["source_documents"]]
    }

def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
    """Similarity search in vector database"""
    results = self.vectorstore.similarity_search_with_score(query, k=k)

    return [
        {
            "content": doc.page_content,
            "score": score,
            "metadata": doc.metadata
        }
        for doc, score in results
    ]

Prompt Engineering

class PromptTemplate: """Advanced prompt templates"""

@staticmethod
def chain_of_thought(question: str) -> str:
    """Chain-of-thought prompting"""
    return f"""Let's solve this step by step:

Question: {question}

Please think through this problem carefully:

  1. First, identify what we need to find
  2. Then, break down the problem into smaller steps
  3. Solve each step
  4. Finally, combine the results

Your step-by-step solution:"""

@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
    """Few-shot learning prompt"""
    examples_text = "\n\n".join([
        f"Input: {ex['input']}\nOutput: {ex['output']}"
        for ex in examples
    ])

    return f"""Task: {task}

Here are some examples:

{examples_text}

Now, please solve this:

Input: {query} Output:"""

@staticmethod
def system_message(role: str, constraints: List[str],
                   format_instructions: str) -> str:
    """System message template"""
    constraints_text = "\n".join([f"- {c}" for c in constraints])

    return f"""You are a {role}.

Constraints: {constraints_text}

Output Format: {format_instructions}

Remember to follow these guidelines strictly."""

AI Agent System

from typing import Callable import json

class Tool: """Tool that agents can use"""

def __init__(self, name: str, description: str, function: Callable):
    self.name = name
    self.description = description
    self.function = function

def to_openai_function(self) -> Dict:
    """Convert to OpenAI function format"""
    return {
        "type": "function",
        "function": {
            "name": self.name,
            "description": self.description,
            "parameters": self.get_parameters()
        }
    }

class AIAgent: """AI agent with tools"""

def __init__(self, llm_client: LLMClient, tools: List[Tool]):
    self.llm = llm_client
    self.tools = {tool.name: tool for tool in tools}
    self.conversation_history = []

async def run(self, user_input: str, max_iterations: int = 10) -> str:
    """Run agent with tool use"""
    self.conversation_history.append({
        "role": "user",
        "content": user_input
    })

    for i in range(max_iterations):
        # Get LLM response with function calling
        response = await self.llm.function_calling(
            messages=self.conversation_history,
            tools=[tool.to_openai_function() for tool in self.tools.values()]
        )

        if response["type"] == "message":
            # Agent is done
            return response["content"]

        # Execute tool
        tool_name = response["function"]
        arguments = json.loads(response["arguments"])

        tool_result = await self.execute_tool(tool_name, arguments)

        # Add tool result to conversation
        self.conversation_history.append({
            "role": "function",
            "name": tool_name,
            "content": str(tool_result)
        })

    return "Max iterations reached"

async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
    """Execute a tool"""
    if tool_name not in self.tools:
        raise ValueError(f"Tool {tool_name} not found")

    tool = self.tools[tool_name]
    return await tool.function(**arguments)

Production Deployment

from fastapi import FastAPI, HTTPException, Depends from fastapi.responses import StreamingResponse from pydantic import BaseModel from circuitbreaker import circuit import asyncio

app = FastAPI()

class ChatRequest(BaseModel): messages: List[Dict] model: str = "gpt-4-turbo" stream: bool = False

class RateLimiter: """Rate limiter for API"""

def __init__(self, max_requests: int, window_seconds: int):
    self.max_requests = max_requests
    self.window_seconds = window_seconds
    self.requests = {}

async def check_limit(self, user_id: str) -> bool:
    """Check if user is within rate limit"""
    import time
    now = time.time()

    if user_id not in self.requests:
        self.requests[user_id] = []

    # Remove old requests
    self.requests[user_id] = [
        req_time for req_time in self.requests[user_id]
        if now - req_time < self.window_seconds
    ]

    if len(self.requests[user_id]) >= self.max_requests:
        return False

    self.requests[user_id].append(now)
    return True

rate_limiter = RateLimiter(max_requests=100, window_seconds=60) llm_client = LLMClient()

@circuit(failure_threshold=5, recovery_timeout=60) async def call_llm(messages: List[Dict]) -> str: """LLM call with circuit breaker""" return await llm_client.chat_completion(messages)

@app.post("/chat") async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)): """Chat endpoint with rate limiting""" # Check rate limit if not await rate_limiter.check_limit(user_id): raise HTTPException(status_code=429, detail="Rate limit exceeded")

try:
    if request.stream:
        async def generate():
            async for chunk in llm_client.chat_completion_streaming(request.messages):
                yield chunk

        return StreamingResponse(generate(), media_type="text/event-stream")
    else:
        response = await call_llm(request.messages)
        return {"response": response}

except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))

Best Practices

LLM Integration

  • Implement fallback providers

  • Use streaming for better UX

  • Cache responses where appropriate

  • Handle rate limits gracefully

  • Monitor token usage and costs

  • Version prompts and track changes

Production Systems

  • Implement circuit breakers

  • Add comprehensive logging

  • Monitor latency and errors

  • Use rate limiting

  • Implement retry logic with backoff

  • Test edge cases thoroughly

Security

  • Validate and sanitize inputs

  • Implement authentication/authorization

  • Never expose API keys in logs

  • Use environment variables for secrets

  • Implement content filtering

  • Monitor for prompt injection

Anti-Patterns

❌ No error handling or fallbacks ❌ Exposing raw LLM outputs without validation ❌ No rate limiting or cost controls ❌ Storing API keys in code ❌ No monitoring or logging ❌ Ignoring token limits ❌ No testing of prompts

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-expert

No summary provided by upstream source.

Repository SourceNeeds Review