embedding-strategies

Guide to selecting and optimizing embedding models for vector search applications.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "embedding-strategies" with this command: npx skills add wshobson/agents/wshobson-agents-embedding-strategies

Embedding Strategies

Guide to selecting and optimizing embedding models for vector search applications.

When to Use This Skill

  • Choosing embedding models for RAG

  • Optimizing chunking strategies

  • Fine-tuning embeddings for domains

  • Comparing embedding model performance

  • Reducing embedding dimensions

  • Handling multilingual content

Core Concepts

  1. Embedding Model Comparison (2026)

Model Dimensions Max Tokens Best For

voyage-3-large 1024 32000 Claude apps (Anthropic recommended)

voyage-3 1024 32000 Claude apps, cost-effective

voyage-code-3 1024 32000 Code search

voyage-finance-2 1024 32000 Financial documents

voyage-law-2 1024 32000 Legal documents

text-embedding-3-large 3072 8191 OpenAI apps, high accuracy

text-embedding-3-small 1536 8191 OpenAI apps, cost-effective

bge-large-en-v1.5 1024 512 Open source, local deployment

all-MiniLM-L6-v2 384 256 Fast, lightweight

multilingual-e5-large 1024 512 Multi-language

  1. Embedding Pipeline

Document → Chunking → Preprocessing → Embedding Model → Vector ↓ [Overlap, Size] [Clean, Normalize] [API/Local]

Templates

Template 1: Voyage AI Embeddings (Recommended for Claude)

from langchain_voyageai import VoyageAIEmbeddings from typing import List import os

Initialize Voyage AI embeddings (recommended by Anthropic for Claude)

embeddings = VoyageAIEmbeddings( model="voyage-3-large", voyage_api_key=os.environ.get("VOYAGE_API_KEY") )

def get_embeddings(texts: List[str]) -> List[List[float]]: """Get embeddings from Voyage AI.""" return embeddings.embed_documents(texts)

def get_query_embedding(query: str) -> List[float]: """Get single query embedding.""" return embeddings.embed_query(query)

Specialized models for domains

code_embeddings = VoyageAIEmbeddings(model="voyage-code-3") finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2") legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")

Template 2: OpenAI Embeddings

from openai import OpenAI from typing import List import numpy as np

client = OpenAI()

def get_embeddings( texts: List[str], model: str = "text-embedding-3-small", dimensions: int = None ) -> List[List[float]]: """Get embeddings from OpenAI with optional dimension reduction.""" # Handle batching for large lists batch_size = 100 all_embeddings = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]

    kwargs = {"input": batch, "model": model}
    if dimensions:
        # Matryoshka dimensionality reduction
        kwargs["dimensions"] = dimensions

    response = client.embeddings.create(**kwargs)
    embeddings = [item.embedding for item in response.data]
    all_embeddings.extend(embeddings)

return all_embeddings

def get_embedding(text: str, **kwargs) -> List[float]: """Get single embedding.""" return get_embeddings([text], **kwargs)[0]

Dimension reduction with Matryoshka embeddings

def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]: """Get embedding with reduced dimensions (Matryoshka).""" return get_embedding( text, model="text-embedding-3-small", dimensions=dimensions )

Template 3: Local Embeddings with Sentence Transformers

from sentence_transformers import SentenceTransformer from typing import List, Optional import numpy as np

class LocalEmbedder: """Local embedding with sentence-transformers."""

def __init__(
    self,
    model_name: str = "BAAI/bge-large-en-v1.5",
    device: str = "cuda"
):
    self.model = SentenceTransformer(model_name, device=device)
    self.model_name = model_name

def embed(
    self,
    texts: List[str],
    normalize: bool = True,
    show_progress: bool = False
) -> np.ndarray:
    """Embed texts with optional normalization."""
    embeddings = self.model.encode(
        texts,
        normalize_embeddings=normalize,
        show_progress_bar=show_progress,
        convert_to_numpy=True
    )
    return embeddings

def embed_query(self, query: str) -> np.ndarray:
    """Embed a query with appropriate prefix for retrieval models."""
    # BGE and similar models benefit from query prefix
    if "bge" in self.model_name.lower():
        query = f"Represent this sentence for searching relevant passages: {query}"
    return self.embed([query])[0]

def embed_documents(self, documents: List[str]) -> np.ndarray:
    """Embed documents for indexing."""
    return self.embed(documents)

E5 model with instructions

class E5Embedder: def init(self, model_name: str = "intfloat/multilingual-e5-large"): self.model = SentenceTransformer(model_name)

def embed_query(self, query: str) -> np.ndarray:
    """E5 requires 'query:' prefix for queries."""
    return self.model.encode(f"query: {query}")

def embed_document(self, document: str) -> np.ndarray:
    """E5 requires 'passage:' prefix for documents."""
    return self.model.encode(f"passage: {document}")

Template 4: Chunking Strategies

from typing import List, Tuple import re

def chunk_by_tokens( text: str, chunk_size: int = 512, chunk_overlap: int = 50, tokenizer=None ) -> List[str]: """Chunk text by token count.""" import tiktoken tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")

tokens = tokenizer.encode(text)
chunks = []

start = 0
while start < len(tokens):
    end = start + chunk_size
    chunk_tokens = tokens[start:end]
    chunk_text = tokenizer.decode(chunk_tokens)
    chunks.append(chunk_text)
    start = end - chunk_overlap

return chunks

def chunk_by_sentences( text: str, max_chunk_size: int = 1000, min_chunk_size: int = 100 ) -> List[str]: """Chunk text by sentences, respecting size limits.""" import nltk sentences = nltk.sent_tokenize(text)

chunks = []
current_chunk = []
current_size = 0

for sentence in sentences:
    sentence_size = len(sentence)

    if current_size + sentence_size > max_chunk_size and current_chunk:
        chunks.append(" ".join(current_chunk))
        current_chunk = []
        current_size = 0

    current_chunk.append(sentence)
    current_size += sentence_size

if current_chunk:
    chunks.append(" ".join(current_chunk))

return chunks

def chunk_by_semantic_sections( text: str, headers_pattern: str = r'^#{1,3}\s+.+$' ) -> List[Tuple[str, str]]: """Chunk markdown by headers, preserving hierarchy.""" lines = text.split('\n') chunks = [] current_header = "" current_content = []

for line in lines:
    if re.match(headers_pattern, line, re.MULTILINE):
        if current_content:
            chunks.append((current_header, '\n'.join(current_content)))
        current_header = line
        current_content = []
    else:
        current_content.append(line)

if current_content:
    chunks.append((current_header, '\n'.join(current_content)))

return chunks

def recursive_character_splitter( text: str, chunk_size: int = 1000, chunk_overlap: int = 200, separators: List[str] = None ) -> List[str]: """LangChain-style recursive splitter.""" separators = separators or ["\n\n", "\n", ". ", " ", ""]

def split_text(text: str, separators: List[str]) -> List[str]:
    if not text:
        return []

    separator = separators[0]
    remaining_separators = separators[1:]

    if separator == "":
        # Character-level split
        return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]

    splits = text.split(separator)
    chunks = []
    current_chunk = []
    current_length = 0

    for split in splits:
        split_length = len(split) + len(separator)

        if current_length + split_length > chunk_size and current_chunk:
            chunk_text = separator.join(current_chunk)

            # Recursively split if still too large
            if len(chunk_text) > chunk_size and remaining_separators:
                chunks.extend(split_text(chunk_text, remaining_separators))
            else:
                chunks.append(chunk_text)

            # Start new chunk with overlap
            overlap_splits = []
            overlap_length = 0
            for s in reversed(current_chunk):
                if overlap_length + len(s) <= chunk_overlap:
                    overlap_splits.insert(0, s)
                    overlap_length += len(s)
                else:
                    break
            current_chunk = overlap_splits
            current_length = overlap_length

        current_chunk.append(split)
        current_length += split_length

    if current_chunk:
        chunks.append(separator.join(current_chunk))

    return chunks

return split_text(text, separators)

Template 5: Domain-Specific Embedding Pipeline

import re from typing import List, Optional from dataclasses import dataclass

@dataclass class EmbeddedDocument: id: str document_id: str chunk_index: int text: str embedding: List[float] metadata: dict

class DomainEmbeddingPipeline: """Pipeline for domain-specific embeddings."""

def __init__(
    self,
    embedding_model: str = "voyage-3-large",
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    preprocessing_fn=None
):
    self.embeddings = VoyageAIEmbeddings(model=embedding_model)
    self.chunk_size = chunk_size
    self.chunk_overlap = chunk_overlap
    self.preprocess = preprocessing_fn or self._default_preprocess

def _default_preprocess(self, text: str) -> str:
    """Default preprocessing."""
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    # Remove special characters (customize for your domain)
    text = re.sub(r'[^\w\s.,!?-]', '', text)
    return text.strip()

async def process_documents(
    self,
    documents: List[dict],
    id_field: str = "id",
    content_field: str = "content",
    metadata_fields: Optional[List[str]] = None
) -> List[EmbeddedDocument]:
    """Process documents for vector storage."""
    processed = []

    for doc in documents:
        content = doc[content_field]
        doc_id = doc[id_field]

        # Preprocess
        cleaned = self.preprocess(content)

        # Chunk
        chunks = chunk_by_tokens(
            cleaned,
            self.chunk_size,
            self.chunk_overlap
        )

        # Create embeddings
        embeddings = await self.embeddings.aembed_documents(chunks)

        # Create records
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            metadata = {"document_id": doc_id, "chunk_index": i}

            # Add specified metadata fields
            if metadata_fields:
                for field in metadata_fields:
                    if field in doc:
                        metadata[field] = doc[field]

            processed.append(EmbeddedDocument(
                id=f"{doc_id}_chunk_{i}",
                document_id=doc_id,
                chunk_index=i,
                text=chunk,
                embedding=embedding,
                metadata=metadata
            ))

    return processed

Code-specific pipeline

class CodeEmbeddingPipeline: """Specialized pipeline for code embeddings."""

def __init__(self):
    # Use Voyage's code-specific model
    self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")

def chunk_code(self, code: str, language: str) -> List[dict]:
    """Chunk code by functions/classes using tree-sitter."""
    try:
        import tree_sitter_languages
        parser = tree_sitter_languages.get_parser(language)
        tree = parser.parse(bytes(code, "utf8"))

        chunks = []
        # Extract function and class definitions
        self._extract_nodes(tree.root_node, code, chunks)
        return chunks
    except ImportError:
        # Fallback to simple chunking
        return [{"text": code, "type": "module"}]

def _extract_nodes(self, node, source_code: str, chunks: list):
    """Recursively extract function/class definitions."""
    if node.type in ['function_definition', 'class_definition', 'method_definition']:
        text = source_code[node.start_byte:node.end_byte]
        chunks.append({
            "text": text,
            "type": node.type,
            "name": self._get_name(node),
            "start_line": node.start_point[0],
            "end_line": node.end_point[0]
        })
    for child in node.children:
        self._extract_nodes(child, source_code, chunks)

def _get_name(self, node) -> str:
    """Extract name from function/class node."""
    for child in node.children:
        if child.type == 'identifier' or child.type == 'name':
            return child.text.decode('utf8')
    return "unknown"

async def embed_with_context(
    self,
    chunk: str,
    context: str = ""
) -> List[float]:
    """Embed code with surrounding context."""
    if context:
        combined = f"Context: {context}\n\nCode:\n{chunk}"
    else:
        combined = chunk
    return await self.embeddings.aembed_query(combined)

Template 6: Embedding Quality Evaluation

import numpy as np from typing import List, Dict

def evaluate_retrieval_quality( queries: List[str], relevant_docs: List[List[str]], # List of relevant doc IDs per query retrieved_docs: List[List[str]], # List of retrieved doc IDs per query k: int = 10 ) -> Dict[str, float]: """Evaluate embedding quality for retrieval."""

def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
    retrieved_k = retrieved[:k]
    relevant_retrieved = len(set(retrieved_k) & relevant)
    return relevant_retrieved / k if k > 0 else 0

def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
    retrieved_k = retrieved[:k]
    relevant_retrieved = len(set(retrieved_k) & relevant)
    return relevant_retrieved / len(relevant) if relevant else 0

def mrr(relevant: set, retrieved: List[str]) -> float:
    for i, doc in enumerate(retrieved):
        if doc in relevant:
            return 1 / (i + 1)
    return 0

def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
    dcg = sum(
        1 / np.log2(i + 2) if doc in relevant else 0
        for i, doc in enumerate(retrieved[:k])
    )
    ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
    return dcg / ideal_dcg if ideal_dcg > 0 else 0

metrics = {
    f"precision@{k}": [],
    f"recall@{k}": [],
    "mrr": [],
    f"ndcg@{k}": []
}

for relevant, retrieved in zip(relevant_docs, retrieved_docs):
    relevant_set = set(relevant)
    metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
    metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
    metrics["mrr"].append(mrr(relevant_set, retrieved))
    metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))

return {name: np.mean(values) for name, values in metrics.items()}

def compute_embedding_similarity( embeddings1: np.ndarray, embeddings2: np.ndarray, metric: str = "cosine" ) -> np.ndarray: """Compute similarity matrix between embedding sets.""" if metric == "cosine": # Normalize and compute dot product norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True) norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True) return norm1 @ norm2.T elif metric == "euclidean": from scipy.spatial.distance import cdist return -cdist(embeddings1, embeddings2, metric='euclidean') elif metric == "dot": return embeddings1 @ embeddings2.T else: raise ValueError(f"Unknown metric: {metric}")

def compare_embedding_models( texts: List[str], models: Dict[str, callable], queries: List[str], relevant_indices: List[List[int]], k: int = 5 ) -> Dict[str, Dict[str, float]]: """Compare multiple embedding models on retrieval quality.""" results = {}

for model_name, embed_fn in models.items():
    # Embed all texts
    doc_embeddings = np.array(embed_fn(texts))

    retrieved_per_query = []
    for query in queries:
        query_embedding = np.array(embed_fn([query])[0])
        # Compute similarities
        similarities = compute_embedding_similarity(
            query_embedding.reshape(1, -1),
            doc_embeddings,
            metric="cosine"
        )[0]
        # Get top-k indices
        top_k_indices = np.argsort(similarities)[::-1][:k]
        retrieved_per_query.append([str(i) for i in top_k_indices])

    # Convert relevant indices to string IDs
    relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]

    results[model_name] = evaluate_retrieval_quality(
        queries, relevant_docs, retrieved_per_query, k
    )

return results

Best Practices

Do's

  • Match model to use case: Code vs prose vs multilingual

  • Chunk thoughtfully: Preserve semantic boundaries

  • Normalize embeddings: For cosine similarity search

  • Batch requests: More efficient than one-by-one

  • Cache embeddings: Avoid recomputing for static content

  • Use Voyage AI for Claude apps: Recommended by Anthropic

Don'ts

  • Don't ignore token limits: Truncation loses information

  • Don't mix embedding models: Incompatible vector spaces

  • Don't skip preprocessing: Garbage in, garbage out

  • Don't over-chunk: Lose important context

  • Don't forget metadata: Essential for filtering and debugging

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
19K31.3Kwshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nextjs-app-router-patterns

No summary provided by upstream source.

Repository SourceNeeds Review