Skip to content

Best Practices

This guide covers best practices for using Satori effectively, including authentication, file management, querying, and performance optimization.

Authentication & Security

Token Management

✅ DO: - Store tokens securely (environment variables, secret management) - Use different tokens for different environments (dev, staging, prod) - Set expiration dates for temporary tokens - Rotate tokens periodically - Use descriptive token names

❌ DON'T: - Commit tokens to version control - Share tokens between team members - Use the same token for multiple applications - Use admin tokens for regular API operations

Example:

import os
from requests import Session

# Store token in environment variable
JWT_TOKEN = os.getenv("SATORI_JWT_TOKEN")

session = Session()
session.headers.update({
    "Authorization": f"Bearer {JWT_TOKEN}"
})

File Management

File Upload Best Practices

✅ DO: - Use webhooks for async processing notifications - Add meaningful metadata to files - Monitor file processing status - Handle file size limits (512MB max) - Use appropriate file types for your use case

❌ DON'T: - Upload files without checking status - Upload duplicate files unnecessarily (deduplication by hash) - Upload files larger than 512MB without splitting

Example with Webhook:

import requests

def upload_file_with_webhook(file_path, enclave_id, webhook_url):
    with open(file_path, "rb") as f:
        response = requests.post(
            f"{BASE_URL}/enclaves/{enclave_id}/files/",
            headers={"Authorization": f"Bearer {JWT_TOKEN}"},
            files={"file": f},
            data={
                "metadata": '{"source": "api", "uploaded_by": "user123"}',
                "webhook_url": webhook_url
            }
        )
    return response.json()

# Your webhook endpoint
@app.post("/webhook/file-processed")
async def handle_webhook(request: Request):
    payload = await request.json()
    if payload["status"] == "ready":
        # File is ready - start querying
        process_ready_file(payload["file_id"])

File Status Monitoring

✅ DO: - Poll file status with reasonable intervals (5-10 seconds) - Implement timeout handling - Handle failed status appropriately - Use webhooks when possible to avoid polling

Example:

import time

def wait_for_file_ready(file_id, max_wait=300, poll_interval=5):
    """Wait for file to be ready, with timeout."""
    start_time = time.time()

    while time.time() - start_time < max_wait:
        response = requests.get(
            f"{BASE_URL}/files/{file_id}",
            headers={"Authorization": f"Bearer {JWT_TOKEN}"}
        )
        file = response.json()

        if file["status"] == "ready":
            return True
        elif file["status"] == "failed":
            raise Exception(f"File processing failed: {file_id}")

        time.sleep(poll_interval)

    raise TimeoutError(f"File not ready within {max_wait} seconds")

Metadata Best Practices

✅ DO: - Include searchable fields (author, date, category) - Keep metadata under 10KB - Use consistent field names across files - Include timestamps and source information

Example:

metadata = {
    "author": "John Doe",
    "date": "2025-01-15",
    "category": "research",
    "department": "engineering",
    "project": "project-alpha",
    "version": "1.0"
}

# Upload with metadata
requests.post(
    f"{BASE_URL}/files/",
    files={"file": f},
    data={"metadata": json.dumps(metadata)}
)

Query Optimization

✅ DO: - Use specific, focused queries - Break complex questions into multiple queries - Use enclave-specific queries when possible - Review source citations for accuracy

❌ DON'T: - Ask overly broad questions - Expect answers about content not in your documents - Ignore source citations

Example:

# ✅ Good: Specific query
query = "What are the payment terms in the contract?"

# ❌ Bad: Too broad
query = "Tell me everything about this document"

# ✅ Good: Multiple focused queries
queries = [
    "What is the contract duration?",
    "What are the payment terms?",
    "What are the termination conditions?"
]

Handling Streaming Responses

✅ DO: - Use streaming for better user experience - Handle Server-Sent Events properly - Display partial results as they arrive - Handle connection errors gracefully

Example:

import requests

def stream_query(query, enclave_id):
    response = requests.post(
        f"{BASE_URL}/enclaves/{enclave_id}/chat/",
        headers={"Authorization": f"Bearer {JWT_TOKEN}"},
        json={"query": query},
        stream=True
    )

    for line in response.iter_lines():
        if line:
            text = line.decode('utf-8')
            if text.startswith('data: '):
                content = text[6:]
                if content == '[DONE]':
                    break
                yield content

Using Source Citations

✅ DO: - Display source citations to users - Use node_ids to fetch full references - Verify information against original documents - Build citation systems for compliance

Example:

# Get query response with sources
response = requests.post(
    f"{BASE_URL}/enclaves/{enclave_id}/query",
    params={"query": query}
)
result = response.json()

# Fetch full references for citations
node_ids = [s["node_id"] for s in result["sources"]]
refs_response = requests.get(
    f"{BASE_URL}/enclaves/{enclave_id}/references",
    params={"node_ids": ",".join(node_ids)}
)
references = refs_response.json()

# Display with citations
print(f"Answer: {result['answer']}")
print("\nSources:")
for ref in references["references"]:
    print(f"- {ref['file_name']}, page {ref['metadata'].get('page', 'N/A')}")

Enclave Organization

When to Create New Enclaves

✅ DO: - Create separate enclaves for different clients - Use enclaves for different document domains (legal, medical, technical) - Separate development and production data - Use enclaves for different projects

❌ DON'T: - Create too many enclaves (consolidate when possible) - Mix unrelated document types in the same enclave - Use enclaves for temporary storage

Enclave Naming Conventions

✅ DO: - Use descriptive names: "Client ABC - Legal Documents" - Include project identifiers - Use consistent naming patterns - Add descriptions for clarity

Example:

enclaves = [
    {
        "name": "Acme Corp - Legal Contracts",
        "description": "Contract and agreement documents for Acme Corporation"
    },
    {
        "name": "Research Project Alpha",
        "description": "Clinical trial documentation and research papers"
    }
]

Performance Optimization

Batch Operations

✅ DO: - Upload multiple files in parallel when possible - Use async/await for concurrent operations - Batch status checks when monitoring multiple files

Example:

import asyncio
import aiohttp

async def upload_files_parallel(file_paths, enclave_id):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for file_path in file_paths:
            task = upload_file_async(session, file_path, enclave_id)
            tasks.append(task)
        results = await asyncio.gather(*tasks)
    return results

Caching Strategies

✅ DO: - Cache enclave information - Cache file status when appropriate - Use ETags or similar for conditional requests - Cache query results for repeated questions

Rate Limiting

✅ DO: - Implement exponential backoff for retries - Respect rate limits (if implemented) - Batch requests when possible - Use webhooks instead of polling

Example:

import time
import random

def retry_with_backoff(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except requests.HTTPError as e:
            if e.response.status_code == 429:  # Rate limited
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error Handling

Common Error Scenarios

✅ DO: - Handle 401 (Unauthorized) by checking token validity - Handle 403 (Forbidden) by verifying enclave access - Handle 404 (Not Found) gracefully - Handle 413 (File Too Large) by splitting files - Handle 415 (Unsupported Media Type) by checking file types

Example:

def handle_api_error(response):
    if response.status_code == 401:
        raise AuthenticationError("Invalid or expired token")
    elif response.status_code == 403:
        raise PermissionError("Access denied to this resource")
    elif response.status_code == 404:
        raise NotFoundError("Resource not found")
    elif response.status_code == 413:
        raise FileTooLargeError("File exceeds 512MB limit")
    elif response.status_code == 415:
        raise UnsupportedFileTypeError("File type not supported")
    else:
        response.raise_for_status()

Testing & Development

Development Workflow

✅ DO: - Use separate enclaves for testing - Clean up test data regularly - Use test-specific token names - Document your integration patterns

Integration Testing

✅ DO: - Test the full workflow: create → upload → query - Test error scenarios - Test with various file types - Test concurrent operations

Monitoring & Observability

Health Checks

✅ DO: - Monitor health check endpoint regularly - Alert on unhealthy status - Check individual component status

Example:

def check_health():
    response = requests.get(f"{BASE_URL}/api/status/health")
    health = response.json()

    if health["overall_status"] != "healthy":
        # Alert or handle degraded state
        for check, status in health["checks"].items():
            if not status:
                logger.warning(f"Component {check} is unhealthy")

Logging

✅ DO: - Log API requests and responses (sanitize tokens) - Log file processing status changes - Log query patterns for analysis - Use structured logging

Next Steps