Connect a new source¶
Connectors pull documents from external systems (APIs, message queues, cloud storage) into the ingestion pipeline. riverbank ships a filesystem connector; you can add others via entry points.
The base class¶
from riverbank.connectors.base import BaseConnector, SourceDocument
class MyConnector(BaseConnector):
name = "my-connector"
def discover(self, config: dict) -> list[SourceDocument]:
"""Return a list of documents available from the source."""
# Query your API / bucket / queue
documents = []
for item in self._fetch_items(config):
documents.append(SourceDocument(
iri=f"http://example.org/source/{item['id']}",
content=item["body"],
metadata=item.get("metadata", {}),
))
return documents
Register via entry point¶
Configuration¶
Connectors receive their configuration from the profile or environment variables. Common patterns:
# In the compiler profile
connector: my-connector
connector_config:
api_url: "https://api.example.com/docs"
api_key: "${MY_API_KEY}"