Build and execute federated pipelines across the network services
Orchestrate multi-stage federated workflows combining multiple services.
client.pipeline(data_sources: List[Service], synthesizer: Service, context_format: str = "simple") -> Pipeline
Create a Federated Retrieval-Augmented Generation (RAG) pipeline that combines multiple distributed data sources with a synthesizer model to provide enhanced, context-aware AI responses. Pipelines are always created through the client, never instantiated directly.
# Load services
openmined_data = client.load_service("irina@openmined.org/openmined-about")
claude_llm = client.load_service("aggregator@openmined.org/claude-sonnet-3.5")
# Create pipeline
pipeline = client.pipeline(
data_sources=[openmined_data],
synthesizer=claude_llm
)
data_sources: List[ServiceSpec]
The collection of data services configured to provide source material for this pipeline, each representing a searchable knowledge base or data repository. These services are queried in parallel during pipeline execution to gather relevant context for the synthesizer model.
synthesizer: ServiceSpec
The AI model service designated to synthesize final responses based on retrieved context from data sources and user queries At the moment, the inputs are concatenated into the context.
estimate_cost(message_count: int = 1) -> float
Calculate an estimated total cost for executing this pipeline based on configured data sources and synthesizer costs..
# Estimate pipeline costs
cost = pipeline.estimate_cost(message_count=1)
print(f"Estimated cost: ${cost:.4f}")
run(messages: List[Dict[str, str]], continue_without_results: bool = False) -> PipelineResult
Execute the complete RAG pipeline synchronously, performing parallel searches across all configured data sources followed by synthesis of results into a comprehensive response. This method blocks until the entire workflow completes, making it ideal for interactive applications where you need the complete result before proceeding.
"role" and "content" keysPipelineResult - Result object containing response, search results, and cost
# Execute pipeline with messages
query = """Explain the concept of attribution-based control in AI systems.
Please cover:
1. What is attribution-based control and why is it important?
2. How can attribution mechanisms help with AI governance?"""
response = pipeline.run(messages=[
{"role": "user", "content": query}
])
# Access results
print(response.response.content)
print(f"Sources: {len(response.search_results)} documents")
print(f"Cost: ${response.cost:.4f}")
run_async(messages: List[Dict[str, str]], continue_without_results: bool = False) -> Awaitable[PipelineResult]
Execute the complete RAG pipeline asynchronously, enabling your application to handle other tasks while the multi-stage workflow processes in the background. This method is essential for scalable applications that need to manage multiple concurrent pipeline executions or maintain responsive user interfaces during long-running RAG operations.
import asyncio
async def main():
messages = [
{"role": "user", "content": "What is attribution-based control?"}
]
result = await pipeline.run_async(messages=messages)
return result
# Run async pipeline
result = asyncio.run(main())
validate(skip_health_check: bool = False) -> bool
Validate that the pipeline configuration is correct and all required services are available and accessible before execution.
# Validate pipeline before execution
try:
pipeline.validate()
print("Pipeline is valid")
except ValidationError as e:
print(f"Pipeline validation failed: {e}")
__repr__() -> str
Display comprehensive pipeline configuration and status information in an interactive widget for Jupyter notebooks or formatted text for terminal environments.
# Display pipeline details
pipeline # Shows data sources, synthesizer, cost estimate
Container for RAG/FedRAG pipeline execution results and metadata.
query: str
The search query that was extracted from the user's messages and used to query data sources during pipeline execution.
response: ChatResponse
The final AI-generated response produced by the synthesizer service, which incorporates both the retrieved context from data sources and the user's original query.
search_results: List[DocumentResult]
The complete collection of search results retrieved from all configured data sources during pipeline execution, including content, relevance scores, and metadata.
cost: float
The aggregate cost charged for the complete pipeline execution.
content: str
Convenient shorthand property that directly accesses the text content of the synthesizer's response, eliminating the need to chain through the response object.
timestamp: datetime
The timestamp indicating when this pipeline execution began.
__str__() -> str
Returns only the synthesized response content as a string.
# Print response content directly
print(result) # Prints response.content
__repr__() -> str
Display a comprehensive view of the pipeline execution results including the query, response, source attribution, cost breakdown, and performance metrics in an interactive HTML widget for Jupyter or formatted text for terminal environments.
# Display full pipeline result
result # Shows query, response, sources, cost breakdown