Custom Content Sources

Fetch content from APIs, databases, or remote services

4 min read 788 words

Content sources let Bengal fetch content from anywhere—local files, GitHub repositories, REST APIs, Notion databases, or custom backends. You can create custom sources by implementing theContentSourceabstract class.

Built-in Sources

Bengal includes four content source types:

Source Type ID Use Case
LocalSource local,filesystem Local markdown files (default)
GitHubSource github GitHub repository content
RESTSource rest,api REST API endpoints
NotionSource notion Notion database pages

Using Built-in Sources

Local Source (Default)

The default source for local markdown files:

# collections.py
from bengal.collections import define_collection
from bengal.content_layer import local_loader

collections = {
    "docs": define_collection(
        schema=Doc,
        loader=local_loader("content/docs", exclude=["_drafts/*"]),
    ),
}

GitHub Source

Fetch content from a GitHub repository:

from bengal.content_layer import github_loader

collections = {
    "api-docs": define_collection(
        schema=APIDoc,
        loader=github_loader(
            repo="myorg/api-docs",
            branch="main",
            path="docs/",
            token=os.environ.get("GITHUB_TOKEN"),
        ),
    ),
}

Requires:pip install bengal[github]

REST Source

Fetch content from a REST API:

from bengal.content_layer import rest_loader

collections = {
    "posts": define_collection(
        schema=BlogPost,
        loader=rest_loader(
            url="https://api.example.com/posts",
            headers={"Authorization": "Bearer ${API_TOKEN}"},
            content_field="body",
            frontmatter_fields={"title": "title", "date": "published_at"},
        ),
    ),
}

Requires:pip install bengal[rest]

Notion Source

Fetch pages from a Notion database:

from bengal.content_layer import notion_loader

collections = {
    "wiki": define_collection(
        schema=WikiPage,
        loader=notion_loader(
            database_id="abc123...",
            token=os.environ.get("NOTION_TOKEN"),
        ),
    ),
}

Requires:pip install bengal[notion]

Creating a Custom Source

Implement theContentSourceabstract class:

from bengal.content_layer.source import ContentSource
from bengal.content_layer.entry import ContentEntry

class MyAPISource(ContentSource):
    """Fetch content from a custom API."""

    @property
    def source_type(self) -> str:
        return "my-api"

    async def fetch_all(self):
        """Fetch all content entries."""
        # Get items from your data source
        items = await self._fetch_items()

        for item in items:
            yield ContentEntry(
                id=item["id"],
                slug=item["slug"],
                content=item["body"],
                frontmatter={
                    "title": item["title"],
                    "date": item["created_at"],
                },
                source_type=self.source_type,
                source_name=self.name,
            )

    async def fetch_one(self, id: str):
        """Fetch a single entry by ID."""
        item = await self._fetch_item(id)
        if not item:
            return None

        return ContentEntry(
            id=item["id"],
            slug=item["slug"],
            content=item["body"],
            frontmatter={
                "title": item["title"],
                "date": item["created_at"],
            },
            source_type=self.source_type,
            source_name=self.name,
        )

    async def _fetch_items(self):
        """Your API call implementation."""
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get(self.config["api_url"]) as resp:
                return await resp.json()

    async def _fetch_item(self, id: str):
        """Fetch single item."""
        import aiohttp
        async with aiohttp.ClientSession() as session:
            url = f"{self.config['api_url']}/{id}"
            async with session.get(url) as resp:
                if resp.status == 404:
                    return None
                return await resp.json()

ContentEntry Structure

Each source yieldsContentEntryobjects:

@dataclass
class ContentEntry:
    id: str                        # Unique identifier within source
    slug: str                      # URL-friendly slug for routing
    content: str                   # Raw markdown content
    frontmatter: dict[str, Any]    # Parsed metadata dictionary
    source_type: str               # Source type (e.g., "github", "notion")
    source_name: str               # Source instance name
    source_url: str | None         # Original URL for attribution
    last_modified: datetime | None # Last modification time
    checksum: str | None           # Content hash for caching

Registering Custom Sources

Option 1: Direct Registration

Register your source instance directly:

from bengal.content_layer import ContentLayerManager

manager = ContentLayerManager()
manager.register_custom_source("my-content", MyAPISource(
    name="my-content",
    config={"api_url": "https://api.example.com/content"},
))

Option 2: With Collections

Use your source as a collection loader:

# collections.py
from bengal.collections import define_collection

my_source = MyAPISource(
    name="my-content",
    config={"api_url": "https://api.example.com/content"},
)

collections = {
    "external": define_collection(
        schema=ExternalContent,
        loader=my_source,
    ),
}

Caching

Content sources support caching to avoid redundant fetches:

class MyAPISource(ContentSource):
    # ...

    def get_cache_key(self) -> str:
        """Generate cache key for this source configuration."""
        # Default implementation hashes config
        # Override for custom cache key logic
        return super().get_cache_key()

    async def is_changed(self, cached_checksum: str | None) -> bool:
        """Check if source content has changed."""
        # Return True to force refetch
        # Return False if content is unchanged
        current = await self._get_current_checksum()
        return current != cached_checksum

    async def get_last_modified(self):
        """Return last modification time for cache invalidation."""
        # Return datetime or None
        return None

Sync Wrappers

For convenience,ContentSourceprovides sync wrappers:

# Async (preferred for performance)
async for entry in source.fetch_all():
    process(entry)

# Sync (convenience wrapper)
for entry in source.fetch_all_sync():
    process(entry)

# Single entry
entry = source.fetch_one_sync("my-id")

Error Handling

Handle errors gracefully in your source:

async def fetch_all(self):
    try:
        items = await self._fetch_items()
    except aiohttp.ClientError as e:
        logger.error(f"Failed to fetch from {self.config['api_url']}: {e}")
        return  # Yield nothing on error

    for item in items:
        try:
            yield self._to_entry(item)
        except KeyError as e:
            logger.warning(f"Skipping malformed item {item.get('id')}: {e}")
            continue

Testing Custom Sources

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_my_api_source():
    source = MyAPISource(
        name="test",
        config={"api_url": "https://api.example.com"},
    )

    with patch.object(source, "_fetch_items", new_callable=AsyncMock) as mock:
        mock.return_value = [
            {"id": "1", "slug": "test", "title": "Test", "body": "Content", "created_at": "2025-01-01"},
        ]

        entries = [entry async for entry in source.fetch_all()]

        assert len(entries) == 1
        assert entries[0].frontmatter["title"] == "Test"