Skip to content

💰 Cost & Latency Engineering

🎓 Page Overview

Trang này cung cấp strategies và techniques để optimize cost và latency cho LLM applications, từ caching đến model selection và context optimization.

Level: Ops Solves: Giảm chi phí và latency cho LLM applications mà không ảnh hưởng chất lượng

🎯 Cost & Latency Overview

Cost Components

Latency Components

Key Metrics

MetricFormulaTarget
Cost per Querytotal_tokens × price_per_tokenMinimize
Time to First Tokenrequest_to_first_token< 500ms
Total Latencyend_to_end_time< 3s
Throughputqueries_per_secondMaximize

💾 Caching Strategies

Cache Hierarchy

Exact Match Cache

python
class ExactMatchCache:
    """Cache for identical queries."""
    
    def __init__(self, redis_client, ttl: int = 3600):
        self.redis = redis_client
        self.ttl = ttl
    
    def get_key(self, query: str, context_hash: str) -> str:
        """Generate cache key from query and context."""
        content = f"{query}:{context_hash}"
        return f"llm:exact:{hashlib.md5(content.encode()).hexdigest()}"
    
    async def get(self, query: str, context_hash: str) -> Optional[str]:
        key = self.get_key(query, context_hash)
        return await self.redis.get(key)
    
    async def set(self, query: str, context_hash: str, response: str):
        key = self.get_key(query, context_hash)
        await self.redis.setex(key, self.ttl, response)

Semantic Cache

python
class SemanticCache:
    """Cache based on semantic similarity."""
    
    def __init__(
        self, 
        vector_db,
        embedding_model,
        similarity_threshold: float = 0.95
    ):
        self.vector_db = vector_db
        self.embed = embedding_model
        self.threshold = similarity_threshold
    
    async def get(self, query: str) -> Optional[CacheHit]:
        embedding = await self.embed(query)
        
        results = await self.vector_db.search(
            embedding=embedding,
            top_k=1,
            filter={"type": "cache"}
        )
        
        if results and results[0].score >= self.threshold:
            return CacheHit(
                response=results[0].metadata["response"],
                similarity=results[0].score,
                original_query=results[0].metadata["query"]
            )
        
        return None
    
    async def set(self, query: str, response: str):
        embedding = await self.embed(query)
        
        await self.vector_db.upsert(
            id=str(uuid.uuid4()),
            embedding=embedding,
            metadata={
                "type": "cache",
                "query": query,
                "response": response,
                "timestamp": datetime.utcnow().isoformat()
            }
        )

Cache Configuration

Cache TypeHit RateLatency SavingsStorage Cost
Exact Match5-15%95%Low
Semantic20-40%90%Medium
Embedding60-80%10%Medium
Prompt Template90%+5%Low

📦 Batching Strategies

Request Batching

Batching Parameters

ParameterDescriptionRecommended
Batch SizeMax requests per batch4-8
Window TimeMax wait before processing50-100ms
Max LatencyTotal acceptable delay200ms

Implementation

python
class RequestBatcher:
    """Batch multiple LLM requests for efficiency."""
    
    def __init__(
        self,
        llm_client,
        max_batch_size: int = 8,
        max_wait_ms: int = 100
    ):
        self.llm = llm_client
        self.max_batch = max_batch_size
        self.max_wait = max_wait_ms / 1000
        self.pending: List[PendingRequest] = []
        self.lock = asyncio.Lock()
    
    async def submit(self, request: LLMRequest) -> LLMResponse:
        future = asyncio.Future()
        pending = PendingRequest(request=request, future=future)
        
        async with self.lock:
            self.pending.append(pending)
            should_process = len(self.pending) >= self.max_batch
        
        if should_process:
            await self._process_batch()
        else:
            # Start timer if first request
            asyncio.create_task(self._delayed_process())
        
        return await future
    
    async def _process_batch(self):
        async with self.lock:
            batch = self.pending[:self.max_batch]
            self.pending = self.pending[self.max_batch:]
        
        if not batch:
            return
        
        # Process batch with single API call
        responses = await self.llm.batch_generate(
            [p.request for p in batch]
        )
        
        for pending, response in zip(batch, responses):
            pending.future.set_result(response)

📏 Context Management

Token Budget Allocation

Context Compression Techniques

TechniqueCompressionQualityLatency
Summarization70-90%High+200ms
Extraction50-70%Medium+100ms
TruncationVariableLow0ms
Selective Inclusion60-80%High+50ms

Dynamic Context Selection

python
class ContextOptimizer:
    """Optimize context for token budget."""
    
    def __init__(self, token_budget: int, tokenizer):
        self.budget = token_budget
        self.tokenizer = tokenizer
    
    def optimize(
        self,
        system_prompt: str,
        retrieved_docs: List[Document],
        user_query: str,
        response_budget: int = 1000
    ) -> OptimizedContext:
        # Calculate fixed costs
        system_tokens = len(self.tokenizer.encode(system_prompt))
        query_tokens = len(self.tokenizer.encode(user_query))
        
        available = self.budget - system_tokens - query_tokens - response_budget
        
        # Rank documents by relevance and fit to budget
        selected_docs = []
        used_tokens = 0
        
        for doc in sorted(retrieved_docs, key=lambda d: d.score, reverse=True):
            doc_tokens = len(self.tokenizer.encode(doc.content))
            
            if used_tokens + doc_tokens <= available:
                selected_docs.append(doc)
                used_tokens += doc_tokens
            elif doc_tokens > available - used_tokens:
                # Truncate if partially fits
                truncated = self.truncate_to_fit(
                    doc.content, 
                    available - used_tokens
                )
                if truncated:
                    selected_docs.append(Document(
                        content=truncated,
                        score=doc.score,
                        truncated=True
                    ))
                break
        
        return OptimizedContext(
            system_prompt=system_prompt,
            context=selected_docs,
            query=user_query,
            tokens_used=system_tokens + used_tokens + query_tokens,
            tokens_remaining=response_budget
        )

🎯 Model Selection

Model Routing Strategy

Model Cost Comparison

ModelInput $/1MOutput $/1MSpeedUse Case
GPT-4 Turbo$10$30SlowComplex reasoning
GPT-3.5 Turbo$0.5$1.5FastSimple tasks
Claude 3 Opus$15$75SlowLong context
Claude 3 Haiku$0.25$1.25FastHigh volume
Gemini Pro$0.5$1.5FastGeneral purpose

Query Router Implementation

python
class ModelRouter:
    """Route queries to cost-appropriate models."""
    
    ROUTING_RULES = {
        "simple_qa": {
            "patterns": ["what is", "define", "explain briefly"],
            "model": "gpt-3.5-turbo",
            "max_tokens": 500
        },
        "complex_reasoning": {
            "patterns": ["analyze", "compare and contrast", "derive"],
            "model": "gpt-4-turbo",
            "max_tokens": 2000
        },
        "code_generation": {
            "patterns": ["write code", "implement", "debug"],
            "model": "gpt-4-turbo",
            "max_tokens": 4000
        },
        "long_document": {
            "context_threshold": 50000,
            "model": "claude-3-sonnet",
            "max_tokens": 4000
        }
    }
    
    def route(self, query: str, context_tokens: int) -> ModelConfig:
        # Check context length first
        if context_tokens > self.ROUTING_RULES["long_document"]["context_threshold"]:
            return ModelConfig(**self.ROUTING_RULES["long_document"])
        
        # Pattern matching
        query_lower = query.lower()
        for rule_name, rule in self.ROUTING_RULES.items():
            if "patterns" in rule:
                if any(p in query_lower for p in rule["patterns"]):
                    return ModelConfig(**rule)
        
        # Default to cost-effective model
        return ModelConfig(**self.ROUTING_RULES["simple_qa"])

📊 Cost Tracking

Cost Attribution

python
class CostTracker:
    """Track and attribute LLM costs."""
    
    PRICING = {
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},
        "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        "text-embedding-3-small": {"input": 0.00002},
    }
    
    def calculate_cost(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int = 0
    ) -> CostBreakdown:
        pricing = self.PRICING.get(model, {"input": 0, "output": 0})
        
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing.get("output", 0)
        
        return CostBreakdown(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            input_cost=input_cost,
            output_cost=output_cost,
            total_cost=input_cost + output_cost
        )
    
    def attribute_cost(
        self,
        cost: CostBreakdown,
        tenant_id: str,
        feature: str
    ):
        """Attribute cost to tenant and feature for billing/analytics."""
        self.metrics.increment(
            "llm.cost.usd",
            cost.total_cost,
            tags={
                "tenant": tenant_id,
                "feature": feature,
                "model": cost.model
            }
        )

📋 Optimization Checklist

Quick Wins

  • [ ] Implement exact match caching
  • [ ] Use smaller model for simple queries
  • [ ] Reduce system prompt length
  • [ ] Set appropriate max_tokens limits

Medium Effort

  • [ ] Add semantic caching layer
  • [ ] Implement request batching
  • [ ] Build query router
  • [ ] Optimize context selection

Advanced

  • [ ] Fine-tune smaller model for specific tasks
  • [ ] Implement speculative decoding
  • [ ] Self-host open-source models for high-volume
  • [ ] Build comprehensive cost attribution

🔗 Cross-References

📚 Further Reading

  • "LLM Cost Optimization Guide" - OpenAI
  • "Semantic Caching for LLMs" - GPTCache Documentation
  • "Production LLM Economics" - a]16z AI Blog