Content sources let Bengal fetch content from anywhere—local files, GitHub repositories, REST APIs, Notion databases, or custom backends. You can create custom sources by implementing theContentSourceabstract class.
Built-in Sources
Bengal includes four content source types:
| Source | Type ID | Use Case |
|---|---|---|
| LocalSource | local,filesystem |
Local markdown files (default) |
| GitHubSource | github |
GitHub repository content |
| RESTSource | rest,api |
REST API endpoints |
| NotionSource | notion |
Notion database pages |
Using Built-in Sources
Local Source (Default)
The default source for local markdown files:
# collections.py
from bengal.collections import define_collection
from bengal.content_layer import local_loader
collections = {
"docs": define_collection(
schema=Doc,
loader=local_loader("content/docs", exclude=["_drafts/*"]),
),
}
GitHub Source
Fetch content from a GitHub repository:
from bengal.content_layer import github_loader
collections = {
"api-docs": define_collection(
schema=APIDoc,
loader=github_loader(
repo="myorg/api-docs",
branch="main",
path="docs/",
token=os.environ.get("GITHUB_TOKEN"),
),
),
}
Requires:pip install bengal[github]
REST Source
Fetch content from a REST API:
from bengal.content_layer import rest_loader
collections = {
"posts": define_collection(
schema=BlogPost,
loader=rest_loader(
url="https://api.example.com/posts",
headers={"Authorization": "Bearer ${API_TOKEN}"},
content_field="body",
frontmatter_fields={"title": "title", "date": "published_at"},
),
),
}
Requires:pip install bengal[rest]
Notion Source
Fetch pages from a Notion database:
from bengal.content_layer import notion_loader
collections = {
"wiki": define_collection(
schema=WikiPage,
loader=notion_loader(
database_id="abc123...",
token=os.environ.get("NOTION_TOKEN"),
),
),
}
Requires:pip install bengal[notion]
Creating a Custom Source
Implement theContentSourceabstract class:
from bengal.content_layer.source import ContentSource
from bengal.content_layer.entry import ContentEntry
class MyAPISource(ContentSource):
"""Fetch content from a custom API."""
@property
def source_type(self) -> str:
return "my-api"
async def fetch_all(self):
"""Fetch all content entries."""
# Get items from your data source
items = await self._fetch_items()
for item in items:
yield ContentEntry(
id=item["id"],
slug=item["slug"],
content=item["body"],
frontmatter={
"title": item["title"],
"date": item["created_at"],
},
source_type=self.source_type,
source_name=self.name,
)
async def fetch_one(self, id: str):
"""Fetch a single entry by ID."""
item = await self._fetch_item(id)
if not item:
return None
return ContentEntry(
id=item["id"],
slug=item["slug"],
content=item["body"],
frontmatter={
"title": item["title"],
"date": item["created_at"],
},
source_type=self.source_type,
source_name=self.name,
)
async def _fetch_items(self):
"""Your API call implementation."""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(self.config["api_url"]) as resp:
return await resp.json()
async def _fetch_item(self, id: str):
"""Fetch single item."""
import aiohttp
async with aiohttp.ClientSession() as session:
url = f"{self.config['api_url']}/{id}"
async with session.get(url) as resp:
if resp.status == 404:
return None
return await resp.json()
ContentEntry Structure
Each source yieldsContentEntryobjects:
@dataclass
class ContentEntry:
id: str # Unique identifier within source
slug: str # URL-friendly slug for routing
content: str # Raw markdown content
frontmatter: dict[str, Any] # Parsed metadata dictionary
source_type: str # Source type (e.g., "github", "notion")
source_name: str # Source instance name
source_url: str | None # Original URL for attribution
last_modified: datetime | None # Last modification time
checksum: str | None # Content hash for caching
Registering Custom Sources
Option 1: Direct Registration
Register your source instance directly:
from bengal.content_layer import ContentLayerManager
manager = ContentLayerManager()
manager.register_custom_source("my-content", MyAPISource(
name="my-content",
config={"api_url": "https://api.example.com/content"},
))
Option 2: With Collections
Use your source as a collection loader:
# collections.py
from bengal.collections import define_collection
my_source = MyAPISource(
name="my-content",
config={"api_url": "https://api.example.com/content"},
)
collections = {
"external": define_collection(
schema=ExternalContent,
loader=my_source,
),
}
Caching
Content sources support caching to avoid redundant fetches:
class MyAPISource(ContentSource):
# ...
def get_cache_key(self) -> str:
"""Generate cache key for this source configuration."""
# Default implementation hashes config
# Override for custom cache key logic
return super().get_cache_key()
async def is_changed(self, cached_checksum: str | None) -> bool:
"""Check if source content has changed."""
# Return True to force refetch
# Return False if content is unchanged
current = await self._get_current_checksum()
return current != cached_checksum
async def get_last_modified(self):
"""Return last modification time for cache invalidation."""
# Return datetime or None
return None
Sync Wrappers
For convenience,ContentSourceprovides sync wrappers:
# Async (preferred for performance)
async for entry in source.fetch_all():
process(entry)
# Sync (convenience wrapper)
for entry in source.fetch_all_sync():
process(entry)
# Single entry
entry = source.fetch_one_sync("my-id")
Error Handling
Handle errors gracefully in your source:
async def fetch_all(self):
try:
items = await self._fetch_items()
except aiohttp.ClientError as e:
logger.error(f"Failed to fetch from {self.config['api_url']}: {e}")
return # Yield nothing on error
for item in items:
try:
yield self._to_entry(item)
except KeyError as e:
logger.warning(f"Skipping malformed item {item.get('id')}: {e}")
continue
Testing Custom Sources
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_my_api_source():
source = MyAPISource(
name="test",
config={"api_url": "https://api.example.com"},
)
with patch.object(source, "_fetch_items", new_callable=AsyncMock) as mock:
mock.return_value = [
{"id": "1", "slug": "test", "title": "Test", "body": "Content", "created_at": "2025-01-01"},
]
entries = [entry async for entry in source.fetch_all()]
assert len(entries) == 1
assert entries[0].frontmatter["title"] == "Test"
Related
- Content Collections for schema validation
- Build Pipeline for understanding discovery phase