Module

content_layer.manager

ContentLayerManager - Orchestrates content fetching from multiple sources.

Handles source registration, parallel fetching, caching, and aggregation.

Classes

CachedSource dataclass
Metadata about a cached source.
0

Metadata about a cached source.

Attributes

Name Type Description
source_key str
cached_at datetime
entry_count int
checksum str | None
ContentLayerManager
Manages content from multiple sources. Handles: - Source registration (local, remote, custom) - Pa…
12

Manages content from multiple sources.

Handles:

  • Source registration (local, remote, custom)
  • Parallel async fetching
  • Disk caching with TTL and invalidation
  • Aggregation of all sources into unified content list

Methods 6

register_source
Register a content source.
3 None
def register_source(self, name: str, source_type: str, config: dict[str, Any]) -> None

Register a content source.

Parameters 3
name str

Unique name for this source instance

source_type str

Type identifier ('local', 'github', 'rest', 'notion')

config dict[str, Any]

Source-specific configuration

register_custom_source
Register a custom source instance.
2 None
def register_custom_source(self, name: str, source: ContentSource) -> None

Register a custom source instance.

Parameters 2
name str

Unique name for this source

source ContentSource

ContentSource implementation instance

fetch_all async
Fetch content from all registered sources. Fetches from all sources in paralle…
1 list[ContentEntry]
async def fetch_all(self, use_cache: bool = True) -> list[ContentEntry]

Fetch content from all registered sources.

Fetches from all sources in parallel, using cache when available and falling back to cached content in offline mode.

Parameters 1
use_cache bool

Whether to use cached content if available

Returns

list[ContentEntry]

List of all content entries from all sources

clear_cache
Clear cached content.
1 int
def clear_cache(self, source_name: str | None = None) -> int

Clear cached content.

Parameters 1
source_name str | None

Specific source to clear, or None for all

Returns

int

Number of cache files deleted

get_cache_status
Get status of all cached sources.
0 dict[str, dict[str, Any]]
def get_cache_status(self) -> dict[str, dict[str, Any]]

Get status of all cached sources.

Returns

dict[str, dict[str, Any]]

Dictionary mapping source names to cache status

fetch_all_sync
Synchronous wrapper for fetch_all().
1 list[ContentEntry]
def fetch_all_sync(self, use_cache: bool = True) -> list[ContentEntry]

Synchronous wrapper for fetch_all().

Parameters 1
use_cache bool

Whether to use cached content if available

Returns

list[ContentEntry]

List of all content entries

Internal Methods 6
__init__
Initialize content layer manager.
3 None
def __init__(self, cache_dir: Path | None = None, cache_ttl: timedelta = timedelta(hours=1), offline: bool = False) -> None

Initialize content layer manager.

Parameters 3
cache_dir Path | None

Directory for caching remote content (default: .bengal/content_cache)

cache_ttl timedelta

Time-to-live for cached content (default: 1 hour)

offline bool

If True, only use cached content (no network requests)

_fetch_source async
Fetch content from a single source with caching.
3 list[ContentEntry]
async def _fetch_source(self, name: str, source: ContentSource, use_cache: bool) -> list[ContentEntry]

Fetch content from a single source with caching.

Parameters 3
name str

Source name

source ContentSource

Source instance

use_cache bool

Whether to check cache first

Returns

list[ContentEntry]

List of content entries from this source

_is_cache_valid
Check if cached content is still valid.
2 bool
def _is_cache_valid(self, name: str, expected_key: str) -> bool

Check if cached content is still valid.

Parameters 2
name str

Source name

expected_key str

Expected cache key (based on current config)

Returns

bool

True if cache is valid and can be used

_load_cache
Load cached entries from disk.
1 list[ContentEntry] | None
def _load_cache(self, name: str) -> list[ContentEntry] | None

Load cached entries from disk.

Parameters 1
name str

Source name

Returns

list[ContentEntry] | None

List of cached entries, or None if cache unavailable

_save_cache
Save entries to cache.
3 None
def _save_cache(self, name: str, entries: list[ContentEntry], cache_key: str) -> None

Save entries to cache.

Parameters 3
name str

Source name

entries list[ContentEntry]

Entries to cache

cache_key str

Cache key for validation

__repr__
0 str
def __repr__(self) -> str
Returns

str