### 多源检索架构
**统一检索接口**
```python
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class DataSource(ABC):
@abstractmethod
def search(self, query: str, **kwargs) -> List[Dict[str, Any]]:
pass
class DatabaseSource(DataSource):
def __init__(self, connection_string: str):
self.connection = create_connection(connection_string)
def search(self, query: str, **kwargs) -> List[Dict[str, Any]]:
# SQL查询实现
sql_query = f"SELECT * FROM documents WHERE content LIKE '%{query}%'"
results = self.connection.execute(sql_query)
return [{"content": row.content, "source": "database", "score": 1.0} for row in results]
class APISource(DataSource):
def __init__(self, api_endpoint: str, api_key: str):
self.endpoint = api_endpoint
self.headers = {"Authorization": f"Bearer {api_key}"}
def search(self, query: str, **kwargs) -> List[Dict[str, Any]]:
response = requests.post(
self.endpoint,
json={"query": query, "top_k": kwargs.get("top_k", 5)},
headers=self.headers
)
return [{"content": item["text"], "source": "api", "score": item["score"]} for item in response.json()]
class FileSource(DataSource):
def __init__(self, file_paths: List[str]):
self.documents = []
for path in file_paths:
with open(path, 'r') as f:
self.documents.extend(f.read().split('\n\n'))
def search(self, query: str, **kwargs) -> List[Dict[str, Any]]:
# 简单的关键词匹配
results = []
for doc in self.documents:
if query.lower() in doc.lower():
score = doc.lower().count(query.lower()) / len(doc.split())
results.append({"content": doc, "source": "file", "score": score})
return sorted(results, key=lambda x: x["score"], reverse=True)[:kwargs.get("top_k", 5)]
# 统一检索器
class MultiSourceRetriever:
def __init__(self, sources: List[DataSource]):
self.sources = sources
def retrieve(self, query: str, **kwargs) -> List[Dict[str, Any]]:
all_results = []
for source in self.sources:
results = source.search(query, **kwargs)
all_results.extend(results)
# 重新排序所有结果
return sorted(all_results, key=lambda x: x["score"], reverse=True)
# 使用示例
sources = [
DatabaseSource("postgresql://user:pass@localhost/mydb"),
APISource("https://api.example.com/search", "your-api-key"),
FileSource(["/data/docs1.txt", "/data/docs2.txt"])
]
retriever = MultiSourceRetriever(sources)
results = retriever.retrieve("你的查询", top_k=10)
```