BETA Payments are disabled during the beta. You have $20 in free credits.

Concepts

Architecture, services, discovery, federated pipelines, attribution, and payments (coming soon)

Architecture Overview

Client: initializes connections, discovers services, orchestrates pipelines, manages auth, and tracks attribution.

AI Services: peers offering /chat and/or /search with their own parameters and pricing.

Service Discovery: index and filters to find services by capability, owner, health, and cost.

Federated Pipeline: composes search over multiple data sources and synthesizes answers with an LLM, preserving source attribution.

Service Discovery

Find and explore available AI services on the network:

from syft_hub import Client

client = Client()

# List all available services
services = client.list_services()
print(f"Found {len(services)} services\n")

# Display service details
for service in services:
    print(f"Name: {service.name}")
    print(f"Type: {service.type}")  # CHAT, SEARCH, or DATA
    print(f"Status: {service.status}")  # healthy, degraded, offline
    print(f"Owner: {service.owner}")
    print(f"Description: {service.description}")
    print("-" * 40)

In Jupyter notebooks, use the interactive widget:

# Display services in an interactive table
client.show_services()  # Shows filterable, sortable table

Filtering Services

Filter services by type, status, or other criteria:

# Filter by service type
chat_services = client.list_services(filter={"type": "CHAT"})
search_services = client.list_services(filter={"type": "SEARCH"})
data_sources = client.list_services(filter={"type": "DATA"})

# Filter by status
healthy_services = client.list_services(filter={"status": "healthy"})

# Filter by owner
openmined_services = client.list_services(filter={"owner": "*@openmined.org"})

# Combine filters
healthy_chat = client.list_services(filter={
    "type": "CHAT",
    "status": "healthy"
})

Loading and Testing Services

Load specific services and test their functionality:

# Load a specific service by name
service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")

# Check service information
print(service)  # Display service details
print(f"Service Type: {service.type}")
print(f"Pricing: ${service.price_per_token}")
print(f"Health Status: {service.health_status}")

# Test service availability
if service.is_available():
    print("Service is ready to use")
else:
    print("Service is currently unavailable")

# Get service metadata
metadata = service.get_metadata()
print(f"Model Version: {metadata['model_version']}")
print(f"Max Tokens: {metadata['max_tokens']}")
print(f"Supported Languages: {metadata['languages']}")

Chat Services

Interact with AI models through chat interfaces:

Basic Chat

# Load a chat service
chat_service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")

# Single message chat
response = client.chat_sync(
    service_name="aggregator@openmined.org/claude-sonnet-3.5",
    messages=[
        {"role": "user", "content": "What is federated learning?"}
    ]
)

print(response.content)
print(f"Tokens used: {response.token_count}")
print(f"Cost: ${response.cost}")

Multi-turn Conversation

# Multi-turn conversation
messages = [
    {"role": "system", "content": "You are a helpful AI assistant."},
    {"role": "user", "content": "What is differential privacy?"},
    {"role": "assistant", "content": "Differential privacy is..."},
    {"role": "user", "content": "How does it apply to federated learning?"}
]

response = client.chat_sync(
    service_name="aggregator@openmined.org/gpt-4",
    messages=messages,
    temperature=0.7,
    max_tokens=500
)

print(response.content)

Chat Schema

Understanding the chat message format:

# Message schema
message = {
    "role": "user",  # "system", "user", or "assistant"
    "content": "Your message here",
    "name": "optional_name",  # Optional: speaker name
    "metadata": {}  # Optional: additional data
}

# Chat response schema
response = {
    "content": "AI response text",
    "role": "assistant",
    "model": "claude-sonnet-3.5",
    "token_count": {
        "input": 150,
        "output": 200,
        "total": 350
    },
    "cost": 0.0035,
    "finish_reason": "stop",  # "stop", "length", "error"
    "metadata": {
        "model_version": "2024-01",
        "latency_ms": 1250
    }
}

Composing Pipelines

Combine multiple services into attribution-aware pipelines:

Basic Pipeline

# Load services
data_source1 = client.load_service("university@edu/papers")
data_source2 = client.load_service("company@tech/docs")
llm = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")

# Create pipeline
pipeline = client.pipeline(
    data_sources=[data_source1, data_source2],
    synthesizer=llm
)

# Execute query
response = pipeline.run(messages=[
    {"role": "user", "content": "What are the latest advances in federated learning?"}
])

print(response.content)

Advanced Pipeline Configuration

# Configure pipeline with options
pipeline = client.pipeline(
    data_sources=[
        (data_source1, 0.6),  # 60% weight
        (data_source2, 0.4)   # 40% weight
    ],
    synthesizer=llm,
    search_options={
        "max_results_per_source": 5,
        "min_relevance": 0.7,
        "parallel_search": True
    },
    synthesis_options={
        "temperature": 0.7,
        "max_tokens": 1000,
        "include_citations": True
    }
)

# Run with custom parameters
response = pipeline.run(
    messages=[{"role": "user", "content": "Your query"}],
    max_cost=5.0,  # Stop if cost exceeds $5
    timeout=30  # 30 second timeout
)

Pipeline Response Schema

# Pipeline response includes attribution
pipeline_response = {
    "content": "Synthesized answer from multiple sources...",
    "sources": [
        {
            "name": "university@edu/papers",
            "contribution": 65,  # Percentage contribution
            "documents_used": 3,
            "relevance_score": 0.89,
            "cost": 0.15
        },
        {
            "name": "company@tech/docs",
            "contribution": 35,
            "documents_used": 2,
            "relevance_score": 0.76,
            "cost": 0.10
        }
    ],
    "total_cost": 0.45,  # Total cost in USD
    "cost_breakdown": {
        "search_cost": 0.25,
        "synthesis_cost": 0.20
    },
    "model": "claude-sonnet-3.5",
    "tokens_used": {
        "search": 500,
        "synthesis_input": 800,
        "synthesis_output": 400,
        "total": 1700
    },
    "latency_ms": 2500,
    "citations": [
        "[1] Smith et al. (2024). Advances in FL. University Papers.",
        "[2] Tech Corp. (2024). FL Implementation Guide. Company Docs."
    ]
}

Attribution Tracking

Every query goes to the source; attribution is captured automatically from source contributions and synthesis context.

Accessing Attribution Information

# After running a pipeline
response = pipeline.run(messages=[...])

# Display attribution report
print("=== Attribution Report ===")
print(f"Query: {messages[0]['content']}\n")

# Source contributions
print("Data Source Contributions:")
for source in response.sources:
    print(f"  • {source.name}:")
    print(f"    - Contribution: {source.contribution}%")
    print(f"    - Documents: {source.documents_used}")
    print(f"    - Relevance: {source.relevance_score:.2f}")
    print(f"    - Cost: ${source.cost:.4f}")

# Cost breakdown
print(f"\nCost Analysis:")
print(f"  Search: ${response.cost_breakdown['search_cost']}")
print(f"  Synthesis: ${response.cost_breakdown['synthesis_cost']}")
print(f"  Total: ${response.total_cost}")

# Model attribution
print(f"\nModel Information:")
print(f"  Model: {response.model}")
print(f"  Tokens: {response.tokens_used['total']}")

Exporting Attribution Data

import json
import csv

# Export to JSON
attribution_data = response.to_dict()
with open("attribution_report.json", "w") as f:
    json.dump(attribution_data, f, indent=2)

# Export to CSV
with open("attribution_report.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["Source", "Contribution %", "Documents", "Cost"])
    for source in response.sources:
        writer.writerow([
            source.name,
            source.contribution,
            source.documents_used,
            source.cost
        ])

# Generate citation list
citations = response.generate_citations(format="APA")
for citation in citations:
    print(citation)

Payments (Beta)

Payments integration exists but is disabled during beta. Enjoy $20 in free credits to explore services. Accounting hooks into attribution for settlement post-beta.

Service Health Monitoring

Monitor service availability and performance:

# Check individual service health
service = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
health = service.get_health_status()

print(f"Status: {health['status']}")  # healthy, degraded, offline
print(f"Uptime: {health['uptime_percentage']}%")
print(f"Average Latency: {health['avg_latency_ms']}ms")
print(f"Error Rate: {health['error_rate']}%")

# Monitor all services
health_report = client.get_services_health()
for service_name, status in health_report.items():
    if status["status"] != "healthy":
        print(f"⚠️ {service_name}: {status['status']}")

# Use only healthy services in pipeline
healthy_sources = [
    s for s in client.list_services(filter={"type": "DATA"})
    if s.health_status == "healthy"
]

pipeline = client.pipeline(
    data_sources=healthy_sources,
    synthesizer=llm
)

Best Practices

Service Selection

  • Always check service health before using
  • Consider cost vs. quality trade-offs
  • Use multiple data sources for better coverage
  • Cache service metadata to reduce API calls

Pipeline Optimization

  • Set appropriate relevance thresholds
  • Use parallel search when possible
  • Implement cost limits to prevent overruns
  • Monitor and log attribution data

Error Handling

from syft_hub import ServiceUnavailableError, ServiceNotFoundError

try:
    service = client.load_service("service@org/name")
    response = pipeline.run(messages=[...])
except ServiceNotFoundError:
    print("Service does not exist")
except ServiceUnavailableError:
    print("Service is temporarily unavailable")
    # Try alternative service or retry later