Connect your existing vector database to your router and make its powerful search services available to users of the SyftBox network. Keep full control over your data and search capabilities.
A Custom Search Router is perfect for users who already have vector databases running and want to keep using their existing infrastructure. Instead of starting from scratch, you're bringing your own search capabilities to the SyftBox network while maintaining full control.
This approach is ideal for:
Open-source vector database with hybrid search and GraphQL API
Managed vector database with high performance and scalability
Vector database with advanced filtering and payload storage
Embedded vector database for local development and testing
Cloud-native vector database for massive scale deployments
Any vector database with a Python client or REST API
First, create the router template through the SyftBox dashboard:
my-vector-search
(choose a descriptive name)Once created, you'll find this structure in your SyftBox apps directory:
my-vector-search/
├── server.py # Main FastAPI server (handles routing)
├── search_service.py # Template for your custom implementation
├── spawn_services.py # Monitors service health and status
├── pyproject.toml # Where you'll add your vector DB dependencies
└── run.sh # Script that starts everything up
Navigate to your router directory and open it in your IDE:
cd ~/SyftBox/apps/my-vector-search
cursor . # or code . for VS Code
Here's a complete example showing how to integrate Weaviate vector database:
import weaviate
from typing import List, Dict, Any
from uuid import UUID
class SearchService:
def __init__(self, config: dict):
self.accounting_client: UserClient = self.config.accounting_client()
logger.info(f"Initialized accounting client: {self.accounting_client}")
logger.info("Initialized custom search service")
self.app_name = self.config.project.name
# Initialize Weaviate client - this connects to your vector database
self.client = weaviate.Client(
url=config.get('weaviate_url', 'http://localhost:8080'),
auth_client_secret=weaviate.AuthApiKey(
api_key=config.get('weaviate_api_key')
) if config.get('weaviate_api_key') else None
)
# The collection where your documents are stored
self.collection_name = config.get('collection_name', 'Documents')
def search_documents(
self,
user_email: EmailStr,
query: str,
options: Optional[SearchOptions] = None,
transaction_token: Optional[str] = None,
) -> SearchResponse:
# 1. Prepare the search payload
limit = options.limit if options else 10
# 2. Build the Weaviate query
where_filter = None
if options and options.filters:
# Build Weaviate filter from options
where_filter = self._build_filter(options.filters)
# 3. Handle payment transaction if pricing is set
query_cost = 0.0
if self.pricing > 0 and transaction_token:
with self.accounting_client.delegated_transfer(
user_email,
amount=self.pricing,
token=transaction_token,
app_name=self.app_name,
app_ep_path="/search",
) as payment_txn:
# Perform vector search
results = self._execute_search(query, limit, where_filter)
# Only confirm payment if we got results
if results:
payment_txn.confirm()
query_cost = self.pricing
else:
# Free service
results = self._execute_search(query, limit, where_filter)
# 4. Convert results to SyftBox format
documents = [
DocumentResult(
id=str(result["_additional"]["id"]),
score=result["_additional"]["distance"],
content=result.get("content", ""),
metadata={
"source": result.get("source"),
"page": result.get("page"),
"category": result.get("category"),
}
)
for result in results
]
# 5. Return SearchResponse
return SearchResponse(
id=UUID.uuid4(),
query=query,
results=documents,
provider_info={"provider": "weaviate", "results_count": len(documents)},
cost=query_cost,
)
def _execute_search(self, query: str, limit: int, where_filter: Dict = None):
"""Execute the actual vector search"""
query_builder = (
self.client.query
.get(self.collection_name, ["content", "source", "page", "category"])
.with_near_text({"concepts": [query]})
.with_limit(limit)
.with_additional(["id", "distance"])
)
if where_filter:
query_builder = query_builder.with_where(where_filter)
result = query_builder.do()
return result.get("data", {}).get("Get", {}).get(self.collection_name, [])
Update your pyproject.toml
:
[project]
dependencies = [
"weaviate-client>=3.24.0", # For Weaviate
# Or for other vector DBs:
# "pinecone-client>=2.2.0", # Pinecone
# "qdrant-client>=1.6.0", # Qdrant
# "chromadb>=0.4.15", # ChromaDB
# "pymilvus>=2.3.0", # Milvus
]
Set up environment variables in a .env
file:
# Weaviate Configuration
WEAVIATE_URL=https://your-cluster.weaviate.network
WEAVIATE_API_KEY=your-api-key-here
COLLECTION_NAME=Documents
# Router Settings
ROUTER_NAME=my-vector-search
ROUTER_PORT=8002
LOG_LEVEL=INFO
import pinecone
from pinecone import Pinecone, ServerlessSpec
class SearchService:
def __init__(self, config: dict):
# Initialize Pinecone
pc = Pinecone(api_key=config.get('pinecone_api_key'))
self.index = pc.Index(config.get('index_name', 'documents'))
def search_documents(self, query: str, ...):
# Generate query embedding
query_embedding = self.embed_text(query)
# Search in Pinecone
results = self.index.query(
vector=query_embedding,
top_k=limit,
include_metadata=True
)
# Process and return results
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
class SearchService:
def __init__(self, config: dict):
self.client = QdrantClient(
url=config.get('qdrant_url', 'http://localhost:6333'),
api_key=config.get('qdrant_api_key')
)
self.collection_name = config.get('collection_name', 'documents')
def search_documents(self, query: str, ...):
# Generate query embedding
query_vector = self.embed_text(query)
# Search in Qdrant
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=limit
)
# Process and return results
Combine vector search with keyword search for better results:
def hybrid_search(self, query: str, limit: int):
"""Combine vector and keyword search"""
# 1. Vector search
vector_results = self.vector_search(query, limit * 2)
# 2. Keyword search
keyword_results = self.keyword_search(query, limit * 2)
# 3. Merge and re-rank results
merged_results = self.merge_results(
vector_results,
keyword_results,
vector_weight=0.7,
keyword_weight=0.3
)
# 4. Return top results
return merged_results[:limit]
Implement document ingestion in your search service:
def add_documents(self, documents: List[Dict], embeddings: List[List[float]]):
"""Add documents to your vector database"""
# Example for Weaviate
batch = self.client.batch.configure(batch_size=100)
for doc, embedding in zip(documents, embeddings):
properties = {
"content": doc["content"],
"source": doc.get("source", ""),
"metadata": doc.get("metadata", {})
}
batch.add_data_object(
data_object=properties,
class_name=self.collection_name,
vector=embedding
)
batch.flush()
curl -X POST https://syftbox.net/api/v1/send/ \
-H "Content-Type: application/json" \
-H "x-syft-from: user@example.com" \
-d '{
"query": "machine learning concepts",
"limit": 5,
"filters": {"category": "technical"},
"suffix-sender": "true",
"x-syft-url": "syft://<your_email>/app_data/my_vector_search/rpc/search"
}'
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_search(self, query_hash: str, limit: int):
"""Cache frequent searches"""
# Your search implementation
pass
def search_documents(self, query: str, limit: int):
# Create hash of query for caching
query_hash = hashlib.md5(query.encode()).hexdigest()
return self.cached_search(query_hash, limit)
async def batch_search(self, queries: List[str]):
"""Process multiple queries efficiently"""
import asyncio
tasks = [
self.async_search(query)
for query in queries
]
results = await asyncio.gather(*tasks)
return results
Once your search router is working perfectly:
Summary: "Advanced vector search with Weaviate"
Description: "High-performance semantic search with custom ranking"
Tags: ["search", "vector", "rag", "weaviate"]
Pricing:
Search: $0.01 per request
tail -f ~/SyftBox/apps/my-vector-search/logs/app.log