syft-hub SDK is in beta. Payments are disabled during beta; enjoy $20 in free credits.

Pipeline API

Build and execute federated pipelines across the network services

syft_hub.Pipeline

Orchestrate multi-stage federated workflows combining multiple services.

Constructor

client.pipeline(data_sources: List[Service], synthesizer: Service, context_format: str = "simple") -> Pipeline

Create a Federated Retrieval-Augmented Generation (RAG) pipeline that combines multiple distributed data sources with a synthesizer model to provide enhanced, context-aware AI responses. Pipelines are always created through the client, never instantiated directly.

Parameters

data_sources List[Service] required
List of data services for search/retrieval
synthesizer Service required
Chat service for response generation
context_format str optional
Format for search context
"simple"

Example

# Load services
openmined_data = client.load_service("irina@openmined.org/openmined-about")
claude_llm = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")

# Create pipeline
pipeline = client.pipeline(
    data_sources=[openmined_data],
    synthesizer=claude_llm
)

Pipeline Properties

data_sources: List[ServiceSpec]

The collection of data services configured to provide source material for this pipeline, each representing a searchable knowledge base or data repository. These services are queried in parallel during pipeline execution to gather relevant context for the synthesizer model.

synthesizer: ServiceSpec

The AI model service designated to synthesize final responses based on retrieved context from data sources and user queries At the moment, the inputs are concatenated into the context.

Estimate Cost


estimate_cost(message_count: int = 1) -> float

Calculate an estimated total cost for executing this pipeline based on configured data sources and synthesizer costs..

Example

# Estimate pipeline costs
cost = pipeline.estimate_cost(message_count=1)
print(f"Estimated cost: ${cost:.4f}")

Execution

Run


run(messages: List[Dict[str, str]], continue_without_results: bool = False) -> PipelineResult

Execute the complete RAG pipeline synchronously, performing parallel searches across all configured data sources followed by synthesis of results into a comprehensive response. This method blocks until the entire workflow completes, making it ideal for interactive applications where you need the complete result before proceeding.

Parameters

messages List[Dict] required
List of message dictionaries with "role" and "content" keys
continue_without_results bool optional
Whether to continue synthesis if no search results found
False

Returns

PipelineResult - Result object containing response, search results, and cost

Example

# Execute pipeline with messages
query = """Explain the concept of attribution-based control in AI systems. 
Please cover:
1. What is attribution-based control and why is it important?
2. How can attribution mechanisms help with AI governance?"""

response = pipeline.run(messages=[
    {"role": "user", "content": query}
])

# Access results
print(response.response.content)
print(f"Sources: {len(response.search_results)} documents")
print(f"Cost: ${response.cost:.4f}")

Run Async


run_async(messages: List[Dict[str, str]], continue_without_results: bool = False) -> Awaitable[PipelineResult]

Execute the complete RAG pipeline asynchronously, enabling your application to handle other tasks while the multi-stage workflow processes in the background. This method is essential for scalable applications that need to manage multiple concurrent pipeline executions or maintain responsive user interfaces during long-running RAG operations.

Example

import asyncio

async def main():
    messages = [
        {"role": "user", "content": "What is attribution-based control?"}
    ]
    result = await pipeline.run_async(messages=messages)
    return result

# Run async pipeline
result = asyncio.run(main())

Pipeline Management

Validate


validate(skip_health_check: bool = False) -> bool

Validate that the pipeline configuration is correct and all required services are available and accessible before execution.

Example

# Validate pipeline before execution
try:
    pipeline.validate()
    print("Pipeline is valid")
except ValidationError as e:
    print(f"Pipeline validation failed: {e}")

Repr


__repr__() -> str

Display comprehensive pipeline configuration and status information in an interactive widget for Jupyter notebooks or formatted text for terminal environments.

Example

# Display pipeline details
pipeline  # Shows data sources, synthesizer, cost estimate

syft_hub.PipelineResult

Container for RAG/FedRAG pipeline execution results and metadata.

Properties

query: str

The search query that was extracted from the user's messages and used to query data sources during pipeline execution.

response: ChatResponse

The final AI-generated response produced by the synthesizer service, which incorporates both the retrieved context from data sources and the user's original query.

search_results: List[DocumentResult]

The complete collection of search results retrieved from all configured data sources during pipeline execution, including content, relevance scores, and metadata.

cost: float

The aggregate cost charged for the complete pipeline execution.

content: str

Convenient shorthand property that directly accesses the text content of the synthesizer's response, eliminating the need to chain through the response object.

timestamp: datetime

The timestamp indicating when this pipeline execution began.

String Representation

Str


__str__() -> str

Returns only the synthesized response content as a string.

Example

# Print response content directly
print(result)  # Prints response.content

Repr


__repr__() -> str

Display a comprehensive view of the pipeline execution results including the query, response, source attribution, cost breakdown, and performance metrics in an interactive HTML widget for Jupyter or formatted text for terminal environments.

Example

# Display full pipeline result
result  # Shows query, response, sources, cost breakdown