Concepts
Architecture, services, discovery, federated pipelines, attribution, and payments (coming soon)
Architecture Overview
Client: initializes connections, discovers services, orchestrates pipelines, manages auth, and tracks attribution.
AI Services: peers offering /chat and/or /search with their own parameters and pricing.
Service Discovery: index and filters to find services by capability, owner, health, and cost.
Federated Pipeline: composes search over multiple data sources and synthesizes answers with an LLM, preserving source attribution.
Service Discovery
Find and explore available AI services on the network:
from syft_hub import Client
client = Client()
# List all available services
services = client.list_services()
print(f"Found {len(services)} services\n")
# Display service details
for service in services:
print(f"Name: {service.name}")
print(f"Type: {service.type}") # CHAT, SEARCH, or DATA
print(f"Status: {service.status}") # healthy, degraded, offline
print(f"Owner: {service.owner}")
print(f"Description: {service.description}")
print("-" * 40)
In Jupyter notebooks, use the interactive widget:
# Display services in an interactive table
client.show_services() # Shows filterable, sortable table
Filtering Services
Filter services by type, status, or other criteria:
# Filter by service type
chat_services = client.list_services(filter={"type": "CHAT"})
search_services = client.list_services(filter={"type": "SEARCH"})
data_sources = client.list_services(filter={"type": "DATA"})
# Filter by status
healthy_services = client.list_services(filter={"status": "healthy"})
# Filter by owner
openmined_services = client.list_services(filter={"owner": "*@openmined.org"})
# Combine filters
healthy_chat = client.list_services(filter={
"type": "CHAT",
"status": "healthy"
})
Loading and Testing Services
Load specific services and test their functionality:
# Load a specific service by name
service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
# Check service information
print(service) # Display service details
print(f"Service Type: {service.type}")
print(f"Pricing: ${service.price_per_token}")
print(f"Health Status: {service.health_status}")
# Test service availability
if service.is_available():
print("Service is ready to use")
else:
print("Service is currently unavailable")
# Get service metadata
metadata = service.get_metadata()
print(f"Model Version: {metadata['model_version']}")
print(f"Max Tokens: {metadata['max_tokens']}")
print(f"Supported Languages: {metadata['languages']}")
Chat Services
Interact with AI models through chat interfaces:
Basic Chat
# Load a chat service
chat_service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
# Single message chat
response = client.chat_sync(
service_name="aggregator@openmined.org/claude-sonnet-3.5",
messages=[
{"role": "user", "content": "What is federated learning?"}
]
)
print(response.content)
print(f"Tokens used: {response.token_count}")
print(f"Cost: ${response.cost}")
Multi-turn Conversation
# Multi-turn conversation
messages = [
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": "What is differential privacy?"},
{"role": "assistant", "content": "Differential privacy is..."},
{"role": "user", "content": "How does it apply to federated learning?"}
]
response = client.chat_sync(
service_name="aggregator@openmined.org/gpt-4",
messages=messages,
temperature=0.7,
max_tokens=500
)
print(response.content)
Chat Schema
Understanding the chat message format:
# Message schema
message = {
"role": "user", # "system", "user", or "assistant"
"content": "Your message here",
"name": "optional_name", # Optional: speaker name
"metadata": {} # Optional: additional data
}
# Chat response schema
response = {
"content": "AI response text",
"role": "assistant",
"model": "claude-sonnet-3.5",
"token_count": {
"input": 150,
"output": 200,
"total": 350
},
"cost": 0.0035,
"finish_reason": "stop", # "stop", "length", "error"
"metadata": {
"model_version": "2024-01",
"latency_ms": 1250
}
}
Search Services
Query data sources with semantic search capabilities:
Basic Search
# Load a search service (data source)
data_source = client.load_service("irina@openmined.org/openmined-blog")
# Perform a search
results = client.search_sync(
service_name="irina@openmined.org/openmined-blog",
query="privacy-preserving machine learning"
)
# Process results
for result in results.documents:
print(f"Title: {result.title}")
print(f"Score: {result.relevance_score}")
print(f"Content: {result.content[:200]}...")
print(f"Source: {result.source_url}")
print("-" * 40)
Advanced Search Options
# Search with filters and options
results = client.search_sync(
service_name="researcher@org/papers",
query="differential privacy",
filters={
"date_range": {"start": "2023-01-01", "end": "2024-12-31"},
"document_type": "research_paper",
"min_relevance": 0.7
},
max_results=10,
include_metadata=True
)
print(f"Found {len(results.documents)} documents")
print(f"Search took {results.latency_ms}ms")
Search Schema
Understanding search requests and responses:
# Search request schema
search_request = {
"query": "search terms",
"filters": {
"date_range": {"start": "2023-01-01", "end": "2024-12-31"},
"document_type": "blog_post",
"tags": ["privacy", "AI"],
"min_relevance": 0.5
},
"max_results": 20,
"offset": 0,
"include_metadata": True,
"highlight": True
}
# Search response schema
search_response = {
"documents": [
{
"id": "doc_123",
"title": "Document Title",
"content": "Full document content...",
"relevance_score": 0.95,
"source_url": "https://example.com/doc",
"metadata": {
"author": "John Doe",
"date": "2024-01-15",
"tags": ["privacy", "federated"],
"word_count": 1500
},
"highlights": [
"...privacy-preserving <mark>machine learning</mark>..."
]
}
],
"total_results": 42,
"latency_ms": 150,
"service": "researcher@org/papers"
}
Composing Pipelines
Combine multiple services into attribution-aware pipelines:
Basic Pipeline
# Load services
data_source1 = client.load_service("university@edu/papers")
data_source2 = client.load_service("company@tech/docs")
llm = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
# Create pipeline
pipeline = client.pipeline(
data_sources=[data_source1, data_source2],
synthesizer=llm
)
# Execute query
response = pipeline.run(messages=[
{"role": "user", "content": "What are the latest advances in federated learning?"}
])
print(response.content)
Advanced Pipeline Configuration
# Configure pipeline with options
pipeline = client.pipeline(
data_sources=[
(data_source1, 0.6), # 60% weight
(data_source2, 0.4) # 40% weight
],
synthesizer=llm,
search_options={
"max_results_per_source": 5,
"min_relevance": 0.7,
"parallel_search": True
},
synthesis_options={
"temperature": 0.7,
"max_tokens": 1000,
"include_citations": True
}
)
# Run with custom parameters
response = pipeline.run(
messages=[{"role": "user", "content": "Your query"}],
max_cost=5.0, # Stop if cost exceeds $5
timeout=30 # 30 second timeout
)
Pipeline Response Schema
# Pipeline response includes attribution
pipeline_response = {
"content": "Synthesized answer from multiple sources...",
"sources": [
{
"name": "university@edu/papers",
"contribution": 65, # Percentage contribution
"documents_used": 3,
"relevance_score": 0.89,
"cost": 0.15
},
{
"name": "company@tech/docs",
"contribution": 35,
"documents_used": 2,
"relevance_score": 0.76,
"cost": 0.10
}
],
"total_cost": 0.45, # Total cost in USD
"cost_breakdown": {
"search_cost": 0.25,
"synthesis_cost": 0.20
},
"model": "claude-sonnet-3.5",
"tokens_used": {
"search": 500,
"synthesis_input": 800,
"synthesis_output": 400,
"total": 1700
},
"latency_ms": 2500,
"citations": [
"[1] Smith et al. (2024). Advances in FL. University Papers.",
"[2] Tech Corp. (2024). FL Implementation Guide. Company Docs."
]
}
Attribution Tracking
Every query goes to the source; attribution is captured automatically from source contributions and synthesis context.
Accessing Attribution Information
# After running a pipeline
response = pipeline.run(messages=[...])
# Display attribution report
print("=== Attribution Report ===")
print(f"Query: {messages[0]['content']}\n")
# Source contributions
print("Data Source Contributions:")
for source in response.sources:
print(f" • {source.name}:")
print(f" - Contribution: {source.contribution}%")
print(f" - Documents: {source.documents_used}")
print(f" - Relevance: {source.relevance_score:.2f}")
print(f" - Cost: ${source.cost:.4f}")
# Cost breakdown
print(f"\nCost Analysis:")
print(f" Search: ${response.cost_breakdown['search_cost']}")
print(f" Synthesis: ${response.cost_breakdown['synthesis_cost']}")
print(f" Total: ${response.total_cost}")
# Model attribution
print(f"\nModel Information:")
print(f" Model: {response.model}")
print(f" Tokens: {response.tokens_used['total']}")
Exporting Attribution Data
import json
import csv
# Export to JSON
attribution_data = response.to_dict()
with open("attribution_report.json", "w") as f:
json.dump(attribution_data, f, indent=2)
# Export to CSV
with open("attribution_report.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Source", "Contribution %", "Documents", "Cost"])
for source in response.sources:
writer.writerow([
source.name,
source.contribution,
source.documents_used,
source.cost
])
# Generate citation list
citations = response.generate_citations(format="APA")
for citation in citations:
print(citation)
Payments (Beta)
Payments integration exists but is disabled during beta. Enjoy $20 in free credits to explore services. Accounting hooks into attribution for settlement post-beta.
Service Health Monitoring
Monitor service availability and performance:
# Check individual service health
service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
health = service.get_health_status()
print(f"Status: {health['status']}") # healthy, degraded, offline
print(f"Uptime: {health['uptime_percentage']}%")
print(f"Average Latency: {health['avg_latency_ms']}ms")
print(f"Error Rate: {health['error_rate']}%")
# Monitor all services
health_report = client.get_services_health()
for service_name, status in health_report.items():
if status["status"] != "healthy":
print(f"⚠️ {service_name}: {status['status']}")
# Use only healthy services in pipeline
healthy_sources = [
s for s in client.list_services(filter={"type": "DATA"})
if s.health_status == "healthy"
]
pipeline = client.pipeline(
data_sources=healthy_sources,
synthesizer=llm
)
Best Practices
Service Selection
- Always check service health before using
- Consider cost vs. quality trade-offs
- Use multiple data sources for better coverage
- Cache service metadata to reduce API calls
Pipeline Optimization
- Set appropriate relevance thresholds
- Use parallel search when possible
- Implement cost limits to prevent overruns
- Monitor and log attribution data
Error Handling
from syft_hub import ServiceUnavailableError, ServiceNotFoundError
try:
service = client.load_service("service@org/name")
response = pipeline.run(messages=[...])
except ServiceNotFoundError:
print("Service does not exist")
except ServiceUnavailableError:
print("Service is temporarily unavailable")
# Try alternative service or retry later