# Custom Content Sources URL: /docs/extending/custom-sources/ Section: extending -------------------------------------------------------------------------------- Content sources let Bengal fetch content from anywhere—local files, GitHub repositories, REST APIs, Notion databases, or custom backends. You can create custom sources by implementing the ContentSource abstract class. Built-in Sources Bengal includes four content source types: Source Type ID Use Case LocalSource local, filesystem Local markdown files (default) GitHubSource github GitHub repository content RESTSource rest, api REST API endpoints NotionSource notion Notion database pages Using Built-in Sources Local Source (Default) The default source for local markdown files: # collections.py from bengal.collections import define_collection from bengal.content_layer import local_loader collections = { "docs": define_collection( schema=Doc, loader=local_loader("content/docs", exclude=["_drafts/*"]), ), } GitHub Source Fetch content from a GitHub repository: from bengal.content_layer import github_loader collections = { "api-docs": define_collection( schema=APIDoc, loader=github_loader( repo="myorg/api-docs", branch="main", path="docs/", token=os.environ.get("GITHUB_TOKEN"), ), ), } Requires: pip install bengal[github] REST Source Fetch content from a REST API: from bengal.content_layer import rest_loader collections = { "posts": define_collection( schema=BlogPost, loader=rest_loader( url="https://api.example.com/posts", headers={"Authorization": "Bearer ${API_TOKEN}"}, content_field="body", frontmatter_fields={"title": "title", "date": "published_at"}, ), ), } Requires: pip install bengal[rest] Notion Source Fetch pages from a Notion database: from bengal.content_layer import notion_loader collections = { "wiki": define_collection( schema=WikiPage, loader=notion_loader( database_id="abc123...", token=os.environ.get("NOTION_TOKEN"), ), ), } Requires: pip install bengal[notion] Creating a Custom Source Implement the ContentSource abstract class: from bengal.content_layer.source import ContentSource from bengal.content_layer.entry import ContentEntry class MyAPISource(ContentSource): """Fetch content from a custom API.""" @property def source_type(self) -> str: return "my-api" async def fetch_all(self): """Fetch all content entries.""" # Get items from your data source items = await self._fetch_items() for item in items: yield ContentEntry( id=item["id"], slug=item["slug"], content=item["body"], frontmatter={ "title": item["title"], "date": item["created_at"], }, source_type=self.source_type, source_name=self.name, ) async def fetch_one(self, id: str): """Fetch a single entry by ID.""" item = await self._fetch_item(id) if not item: return None return ContentEntry( id=item["id"], slug=item["slug"], content=item["body"], frontmatter={ "title": item["title"], "date": item["created_at"], }, source_type=self.source_type, source_name=self.name, ) async def _fetch_items(self): """Your API call implementation.""" import aiohttp async with aiohttp.ClientSession() as session: async with session.get(self.config["api_url"]) as resp: return await resp.json() async def _fetch_item(self, id: str): """Fetch single item.""" import aiohttp async with aiohttp.ClientSession() as session: url = f"{self.config['api_url']}/{id}" async with session.get(url) as resp: if resp.status == 404: return None return await resp.json() ContentEntry Structure Each source yields ContentEntry objects: @dataclass class ContentEntry: id: str # Unique identifier within source slug: str # URL-friendly slug for routing content: str # Raw markdown content frontmatter: dict[str, Any] # Parsed metadata dictionary source_type: str # Source type (e.g., "github", "notion") source_name: str # Source instance name source_url: str | None # Original URL for attribution last_modified: datetime | None # Last modification time checksum: str | None # Content hash for caching Registering Custom Sources Option 1: Direct Registration Register your source instance directly: from bengal.content_layer import ContentLayerManager manager = ContentLayerManager() manager.register_custom_source("my-content", MyAPISource( name="my-content", config={"api_url": "https://api.example.com/content"}, )) Option 2: With Collections Use your source as a collection loader: # collections.py from bengal.collections import define_collection my_source = MyAPISource( name="my-content", config={"api_url": "https://api.example.com/content"}, ) collections = { "external": define_collection( schema=ExternalContent, loader=my_source, ), } Caching Content sources support caching to avoid redundant fetches: class MyAPISource(ContentSource): # ... def get_cache_key(self) -> str: """Generate cache key for this source configuration.""" # Default implementation hashes config # Override for custom cache key logic return super().get_cache_key() async def is_changed(self, cached_checksum: str | None) -> bool: """Check if source content has changed.""" # Return True to force refetch # Return False if content is unchanged current = await self._get_current_checksum() return current != cached_checksum async def get_last_modified(self): """Return last modification time for cache invalidation.""" # Return datetime or None return None Sync Wrappers For convenience, ContentSource provides sync wrappers: # Async (preferred for performance) async for entry in source.fetch_all(): process(entry) # Sync (convenience wrapper) for entry in source.fetch_all_sync(): process(entry) # Single entry entry = source.fetch_one_sync("my-id") Error Handling Handle errors gracefully in your source: async def fetch_all(self): try: items = await self._fetch_items() except aiohttp.ClientError as e: logger.error(f"Failed to fetch from {self.config['api_url']}: {e}") return # Yield nothing on error for item in items: try: yield self._to_entry(item) except KeyError as e: logger.warning(f"Skipping malformed item {item.get('id')}: {e}") continue Testing Custom Sources import pytest from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_my_api_source(): source = MyAPISource( name="test", config={"api_url": "https://api.example.com"}, ) with patch.object(source, "_fetch_items", new_callable=AsyncMock) as mock: mock.return_value = [ {"id": "1", "slug": "test", "title": "Test", "body": "Content", "created_at": "2025-01-01"}, ] entries = [entry async for entry in source.fetch_all()] assert len(entries) == 1 assert entries[0].frontmatter["title"] == "Test" Related Content Collections for schema validation Build Pipeline for understanding discovery phase -------------------------------------------------------------------------------- Metadata: - Author: lbliii - Word Count: 725 - Reading Time: 4 minutes