Best Practices
This guide covers best practices for using Satori effectively, including authentication, file management, querying, and performance optimization.
Authentication & Security
Token Management
✅ DO: - Store tokens securely (environment variables, secret management) - Use different tokens for different environments (dev, staging, prod) - Set expiration dates for temporary tokens - Rotate tokens periodically - Use descriptive token names
❌ DON'T: - Commit tokens to version control - Share tokens between team members - Use the same token for multiple applications - Use admin tokens for regular API operations
Example:
import os
from requests import Session
# Store token in environment variable
JWT_TOKEN = os.getenv("SATORI_JWT_TOKEN")
session = Session()
session.headers.update({
"Authorization": f"Bearer {JWT_TOKEN}"
})
File Management
File Upload Best Practices
✅ DO: - Use webhooks for async processing notifications - Add meaningful metadata to files - Monitor file processing status - Handle file size limits (512MB max) - Use appropriate file types for your use case
❌ DON'T: - Upload files without checking status - Upload duplicate files unnecessarily (deduplication by hash) - Upload files larger than 512MB without splitting
Example with Webhook:
import requests
def upload_file_with_webhook(file_path, enclave_id, webhook_url):
with open(file_path, "rb") as f:
response = requests.post(
f"{BASE_URL}/enclaves/{enclave_id}/files/",
headers={"Authorization": f"Bearer {JWT_TOKEN}"},
files={"file": f},
data={
"metadata": '{"source": "api", "uploaded_by": "user123"}',
"webhook_url": webhook_url
}
)
return response.json()
# Your webhook endpoint
@app.post("/webhook/file-processed")
async def handle_webhook(request: Request):
payload = await request.json()
if payload["status"] == "ready":
# File is ready - start querying
process_ready_file(payload["file_id"])
File Status Monitoring
✅ DO: - Poll file status with reasonable intervals (5-10 seconds) - Implement timeout handling - Handle failed status appropriately - Use webhooks when possible to avoid polling
Example:
import time
def wait_for_file_ready(file_id, max_wait=300, poll_interval=5):
"""Wait for file to be ready, with timeout."""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(
f"{BASE_URL}/files/{file_id}",
headers={"Authorization": f"Bearer {JWT_TOKEN}"}
)
file = response.json()
if file["status"] == "ready":
return True
elif file["status"] == "failed":
raise Exception(f"File processing failed: {file_id}")
time.sleep(poll_interval)
raise TimeoutError(f"File not ready within {max_wait} seconds")
Metadata Best Practices
✅ DO: - Include searchable fields (author, date, category) - Keep metadata under 10KB - Use consistent field names across files - Include timestamps and source information
Example:
metadata = {
"author": "John Doe",
"date": "2025-01-15",
"category": "research",
"department": "engineering",
"project": "project-alpha",
"version": "1.0"
}
# Upload with metadata
requests.post(
f"{BASE_URL}/files/",
files={"file": f},
data={"metadata": json.dumps(metadata)}
)
Querying & Search
Query Optimization
✅ DO: - Use specific, focused queries - Break complex questions into multiple queries - Use enclave-specific queries when possible - Review source citations for accuracy
❌ DON'T: - Ask overly broad questions - Expect answers about content not in your documents - Ignore source citations
Example:
# ✅ Good: Specific query
query = "What are the payment terms in the contract?"
# ❌ Bad: Too broad
query = "Tell me everything about this document"
# ✅ Good: Multiple focused queries
queries = [
"What is the contract duration?",
"What are the payment terms?",
"What are the termination conditions?"
]
Handling Streaming Responses
✅ DO: - Use streaming for better user experience - Handle Server-Sent Events properly - Display partial results as they arrive - Handle connection errors gracefully
Example:
import requests
def stream_query(query, enclave_id):
response = requests.post(
f"{BASE_URL}/enclaves/{enclave_id}/chat/",
headers={"Authorization": f"Bearer {JWT_TOKEN}"},
json={"query": query},
stream=True
)
for line in response.iter_lines():
if line:
text = line.decode('utf-8')
if text.startswith('data: '):
content = text[6:]
if content == '[DONE]':
break
yield content
Using Source Citations
✅ DO: - Display source citations to users - Use node_ids to fetch full references - Verify information against original documents - Build citation systems for compliance
Example:
# Get query response with sources
response = requests.post(
f"{BASE_URL}/enclaves/{enclave_id}/query",
params={"query": query}
)
result = response.json()
# Fetch full references for citations
node_ids = [s["node_id"] for s in result["sources"]]
refs_response = requests.get(
f"{BASE_URL}/enclaves/{enclave_id}/references",
params={"node_ids": ",".join(node_ids)}
)
references = refs_response.json()
# Display with citations
print(f"Answer: {result['answer']}")
print("\nSources:")
for ref in references["references"]:
print(f"- {ref['file_name']}, page {ref['metadata'].get('page', 'N/A')}")
Enclave Organization
When to Create New Enclaves
✅ DO: - Create separate enclaves for different clients - Use enclaves for different document domains (legal, medical, technical) - Separate development and production data - Use enclaves for different projects
❌ DON'T: - Create too many enclaves (consolidate when possible) - Mix unrelated document types in the same enclave - Use enclaves for temporary storage
Enclave Naming Conventions
✅ DO: - Use descriptive names: "Client ABC - Legal Documents" - Include project identifiers - Use consistent naming patterns - Add descriptions for clarity
Example:
enclaves = [
{
"name": "Acme Corp - Legal Contracts",
"description": "Contract and agreement documents for Acme Corporation"
},
{
"name": "Research Project Alpha",
"description": "Clinical trial documentation and research papers"
}
]
Performance Optimization
Batch Operations
✅ DO: - Upload multiple files in parallel when possible - Use async/await for concurrent operations - Batch status checks when monitoring multiple files
Example:
import asyncio
import aiohttp
async def upload_files_parallel(file_paths, enclave_id):
async with aiohttp.ClientSession() as session:
tasks = []
for file_path in file_paths:
task = upload_file_async(session, file_path, enclave_id)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
Caching Strategies
✅ DO: - Cache enclave information - Cache file status when appropriate - Use ETags or similar for conditional requests - Cache query results for repeated questions
Rate Limiting
✅ DO: - Implement exponential backoff for retries - Respect rate limits (if implemented) - Batch requests when possible - Use webhooks instead of polling
Example:
import time
import random
def retry_with_backoff(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except requests.HTTPError as e:
if e.response.status_code == 429: # Rate limited
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error Handling
Common Error Scenarios
✅ DO: - Handle 401 (Unauthorized) by checking token validity - Handle 403 (Forbidden) by verifying enclave access - Handle 404 (Not Found) gracefully - Handle 413 (File Too Large) by splitting files - Handle 415 (Unsupported Media Type) by checking file types
Example:
def handle_api_error(response):
if response.status_code == 401:
raise AuthenticationError("Invalid or expired token")
elif response.status_code == 403:
raise PermissionError("Access denied to this resource")
elif response.status_code == 404:
raise NotFoundError("Resource not found")
elif response.status_code == 413:
raise FileTooLargeError("File exceeds 512MB limit")
elif response.status_code == 415:
raise UnsupportedFileTypeError("File type not supported")
else:
response.raise_for_status()
Testing & Development
Development Workflow
✅ DO: - Use separate enclaves for testing - Clean up test data regularly - Use test-specific token names - Document your integration patterns
Integration Testing
✅ DO: - Test the full workflow: create → upload → query - Test error scenarios - Test with various file types - Test concurrent operations
Monitoring & Observability
Health Checks
✅ DO: - Monitor health check endpoint regularly - Alert on unhealthy status - Check individual component status
Example:
def check_health():
response = requests.get(f"{BASE_URL}/api/status/health")
health = response.json()
if health["overall_status"] != "healthy":
# Alert or handle degraded state
for check, status in health["checks"].items():
if not status:
logger.warning(f"Component {check} is unhealthy")
Logging
✅ DO: - Log API requests and responses (sanitize tokens) - Log file processing status changes - Log query patterns for analysis - Use structured logging
Next Steps
- File Management Guide - Deep dive into file operations
- Querying Documents Guide - Advanced query techniques
- Chat & Agents Guide - Using AI agents
- Text Processing Guide - Text processing tools