ADR-005: Content Orchestration Architecture
Status
Superseded by ADR-007 | Proposed | Accepted | Extends ADR-004
Version History
- v1: 2025-04-28 - Initial proposal
- v2: 2025-04-29 - Updated with additional architecture components and improvements
- v3: 2025-04-29 - Refined with capability decoupling and updated integration patterns
Context
We have successfully implemented the Reactive Content Streams architecture outlined in ADR-004, which provides a robust foundation for content handling through RxJS-based observables. However, we now face the challenge of coordinating multiple content streams from different sources (filesystem, memory, remote, etc.) while maintaining a clean and intuitive API.
Current limitations with our streams implementation include:
Coordination Complexity: There’s no standardized way to coordinate between multiple streams for content resolution and operation routing.
Capability Interface Issues: The current boolean-based capability system is too simplistic for expressing nuanced stream capabilities. Additionally, capabilities are tightly coupled to source types, even though they are conceptually distinct (e.g., an HTTP source could be read-only or fully CRUD-capable).
Architectural Ambiguity: It’s unclear whether the content coordination layer should implement the stream interface, leading to potential design leakage.
Resource Management Challenges: Managing subscriptions and resource cleanup across multiple streams requires careful coordination.
Caching Strategy: There’s no unified approach to caching content from various streams.
Our previous solution in the legacy codebase used a Registry pattern with multiple registry implementations. While this approach worked, it led to interface duplication, unclear responsibility boundaries, and complex capability detection logic in components.
Decision
Implement a Content Orchestration Architecture with these core components:
ContentMediator Pattern: Use a mediator that coordinates between streams without implementing the stream interface itself, maintaining clear separation of concerns.
Decoupled Capability Model: Separate capabilities from source types using a capability profile system that allows flexible capability expression.
Pipeline Architecture: Implement a content processing pipeline that separates resolution, fetching, transformation, and caching into distinct composable stages.
Declarative Query Model: Use a reactive query model for content requests instead of imperative method calls.
Pluggable Caching Strategy: Implement a configurable caching layer to optimize performance.
Health Monitoring System: Add monitoring for stream health and reliability.
The complete architecture can be visualized as:
+------------------+ +---------------------+ +-------------------+
| React Components | | Content Hooks | | Static Assets |
+------------------+ +---------------------+ +-------------------+
| | |
v v v
+----------------------------------------------------------------------+
| Content Query Interface |
| |
| +-----------------------------------------------------------+ |
| | Content Mediator | |
| | | |
| | +----------------+ +-------------------+ | |
| | | Stream Registry | | Capability Profiles | | |
| | +----------------+ +-------------------+ | |
| | | |
| | +----------------+ +-------------------+ | |
| | | Resolution Cache| | Content Cache | | |
| | +----------------+ +-------------------+ | |
| +-----------------------------------------------------------+ |
| |
| +-----------------------------------------------------------+ |
| | Content Pipeline | |
| | | |
| | +-----------+ +--------+ +-------------+ +------+ | |
| | | Resolvers |-->|Fetchers|-->| Transformers |--> | Enrichers | | |
| | +-----------+ +--------+ +-------------+ +------+ | |
| +-----------------------------------------------------------+ |
+----------------------------------------------------------------------+
| | | |
v v v v
+-------------+ +--------------+ +-------------+ +-------------+
| Filesystem | | Memory | | Remote | | Custom |
| Stream | | Stream | | Stream | | Streams |
+-------------+ +--------------+ +-------------+ +-------------+
| | | |
v v v v
+-------------+ +--------------+ +-------------+ +-------------+
| Local Files | | In-Memory | | Remote APIs | | Custom |
| & Assets | | Content | | & Services | | Sources |
+-------------+ +--------------+ +-------------+ +-------------+
Core Components
1. Content Mediator
export interface ContentMediator<T = RawContent> extends Disposable {
// Stream registration with capability profiles
registerStream(
stream: ContentStream<T>,
options: StreamRegistrationOptions
): string
unregisterStream(streamId: string): void
getRegisteredStreams(): ReadonlyMap<string, StreamInfo<T>>
// Capability profiles management
registerCapabilityProfile(profile: CapabilityProfile): void
getCapabilityProfile(profileId: string): CapabilityProfile | null
listCapabilityProfiles(): CapabilityProfile[]
// Content operations - note these don't mirror the ContentStream interface
resolveContent(query: ContentQuery): Observable<ResolvedContent<T>>
executeContentOperation(
operation: ContentOperation
): Observable<OperationResult<T>>
// Content observation
observeContent(query: ContentQuery): Observable<ResolvedContent<T>>
observeChanges(pattern?: string): Observable<ContentChange<T>>
// Capability management
getCapabilitiesForQuery(query: ContentQuery): ContentCapabilities
getEffectiveCapabilities(streamId: string): ContentCapabilities
// Health monitoring
getStreamHealth(streamId?: string): StreamHealth
observeStreamHealth(): Observable<StreamHealthUpdate>
}
2. Decoupled Capability Model
/**
* Capability levels for different operations
*/
export enum CapabilityLevel {
NONE = 'none',
BASIC = 'basic',
PARTIAL = 'partial',
FULL = 'full',
}
/**
* Content capabilities interface
*/
export interface ContentCapabilities {
read: CapabilityLevel
write: CapabilityLevel
delete: CapabilityLevel
list: CapabilityLevel
watch: CapabilityLevel
// Additional capability metadata
features: {
concurrentOperations: boolean
transactionalOperations: boolean
versionHistory: boolean
metadataSupport: boolean
fullTextSearch: boolean
changeTracking: boolean
schemaValidation: boolean
}
// Resource limits
limits?: {
maxFileSize?: number
maxOperationsPerSecond?: number
maxConcurrentOperations?: number
}
}
/**
* Reusable capability profile
*/
export interface CapabilityProfile {
id: string
name: string
description: string
capabilities: ContentCapabilities
}
/**
* Stream registration options
*/
export interface StreamRegistrationOptions {
id?: string
namespaces: string[]
priority?: number
capabilityProfiles: string[]
capabilityOverrides?: Partial<ContentCapabilities>
matcher?: (uri: string) => boolean
}
3. Pipeline Architecture
export interface ContentPipeline<T = RawContent> {
// Pipeline stages
resolvers: PipelineStage<ContentQuery, string[]>[]
fetchers: PipelineStage<string, T>[]
transformers: PipelineStage<T, ProcessedContent>[]
enrichers: PipelineStage<ProcessedContent, ProcessedContent>[]
// Pipeline execution
execute(input: ContentQuery): Observable<ProcessedContent>
// Pipeline configuration
addResolver(
resolver: PipelineStage<ContentQuery, string[]>,
priority: number
): this
addFetcher(fetcher: PipelineStage<string, T>, priority: number): this
addTransformer(
transformer: PipelineStage<T, ProcessedContent>,
priority: number
): this
addEnricher(
enricher: PipelineStage<ProcessedContent, ProcessedContent>,
priority: number
): this
}
export interface PipelineStage<TInput, TOutput> {
execute(input: TInput, context: PipelineContext): Observable<TOutput>
canHandle(input: TInput, context: PipelineContext): boolean
priority: number
name: string
}
export interface PipelineContext {
// Contextual information for pipeline execution
operation: ContentOperation | ContentQuery
timestamp: number
executionId: string
// Shared state for pipeline stages
state: Map<string, unknown>
// Performance metrics
metrics: {
stageDurations: Record<string, number>
totalDuration?: number
startTime: number
}
// Tracing and debugging
traces: PipelineTrace[]
debug: boolean
}
4. Declarative Query Model
export interface ContentQuery {
// URI for content resolution
uri: string
// Query options
options?: {
// Refresh strategy
refresh?: 'always' | 'if-stale' | 'never'
// Caching strategy
cache?: boolean
// Transformation pipeline to apply
transformations?: string[]
// Whether to continue observing content after initial fetch
observe?: boolean
// Stream selector (bypass resolution)
sourceStrategy?: {
type: 'specific' | 'capability-based' | 'auto'
streamId?: string
requiredCapabilities?: Partial<ContentCapabilities>
}
// Progressive loading options
progressive?: {
enabled: boolean
initialChunkSize?: number
subsequentChunkSize?: number
}
// Additional stream-specific options
[key: string]: unknown
}
}
export interface ContentOperation {
// Operation type
type: 'read' | 'write' | 'delete' | 'list'
// Operation target
target: string
// Operation-specific data
data?: unknown
// Operation options
options?: {
// Operation timeout
timeout?: number
// Stream selector (bypass resolution)
streamId?: string
// Whether to update cache after operation
updateCache?: boolean
// Additional operation-specific options
[key: string]: unknown
}
}
5. Pluggable Caching System
export interface ContentCache<T> extends Disposable {
// Cache operations
get(key: string): Observable<CacheEntry<T> | null>
set(key: string, value: T, options?: CacheOptions): Observable<void>
invalidate(keyPattern: string): Observable<number>
clear(): Observable<void>
// Cache information
getStats(): CacheStats
observeStats(): Observable<CacheStats>
// Cache configuration
configure(options: CacheConfiguration): void
}
export interface CacheEntry<T> {
value: T
timestamp: number
metadata?: Record<string, unknown>
}
export interface CacheOptions {
ttl?: number
priority?: number
tags?: string[]
refresh?: 'lazy' | 'eager'
}
export interface CacheConfiguration {
maxEntries?: number
defaultTtl?: number
evictionPolicy?: 'lru' | 'lfu' | 'fifo' | 'custom'
storage?: 'memory' | 'localStorage' | 'indexedDB' | 'custom'
}
6. Stream Health Monitoring
export interface StreamHealth {
status: 'healthy' | 'degraded' | 'failing' | 'disconnected'
metrics: {
availability: number // 0-1
latency: number // ms
errorRate: number // 0-1
throughput: number // ops/sec
}
lastError?: Error
lastChecked: number
}
export interface StreamHealthUpdate {
streamId: string
health: StreamHealth
previousStatus?: 'healthy' | 'degraded' | 'failing' | 'disconnected'
timestamp: number
}
Alternatives Considered
Enhanced Registry Pattern (Extension of ADR-003)
- Pros: More familiar pattern, less architectural change
- Cons: Interface duplication, unclear responsibility boundaries
- Outcome: Rejected due to architectural concerns about separation of concerns
Content Microservices
- Pros: Clear boundaries, independent scaling
- Cons: Unnecessary complexity for a frontend library
- Outcome: Rejected as overly complex
Stream Implementation Inheritance
- Pros: Unified interface, consistent API
- Cons: Forces orchestration layer to implement low-level stream methods
- Outcome: Rejected due to violation of single responsibility principle
Event Sourcing Architecture
- Pros: Strong audit capabilities, time-travel debugging
- Cons: Significant complexity, overkill for content system
- Outcome: Partially adopted concepts for change tracking, rejected as primary architecture
GraphQL-style Query System
- Pros: Powerful declarative queries, field selection
- Cons: Implementation complexity, learning curve
- Outcome: Adapted concepts for the content query model, but rejected full GraphQL implementation
Consequences
Benefits
Clear Separation of Concerns: The mediator coordinates between streams without implementing the stream interface, maintaining architectural clarity.
Decoupled Capabilities: Separating capabilities from source types allows for more flexible and accurate capability expression.
Consistent Stream Selection: The mediator provides a standard mechanism for selecting appropriate streams based on capabilities, URIs, and other criteria.
Flexible Pipeline Architecture: Processing pipeline enables clean separation of concerns between resolution, fetching, and transformation.
Intuitive Developer Experience: Declarative query model provides a clean, reactive API for content operations.
Better Testability: Components can be tested in isolation with clear boundaries.
Enhanced Caching: Pluggable caching strategy with configurable policies and storage backends.
Future Extensibility: Architecture supports adding new stream types, content processors, and pipeline stages without modifying existing code.
Progressive Loading: Support for large content through progressive loading strategies.
Health Monitoring: Automatic detection and recovery from failing streams.
Challenges
Implementation Complexity: More sophisticated architecture requires careful implementation.
Learning Curve: Developers need to understand the mediator pattern and pipeline concepts.
Migration Effort: Existing code needs to be adapted to the new architecture.
Performance Considerations: Pipeline stages add computational overhead that must be managed.
Increased Abstraction: More layers of abstraction could make debugging more difficult.
Cache Consistency: Maintaining consistency between caches and sources requires careful handling.
Implementation Strategy
The implementation will follow a phased approach to minimize disruption:
Phase 1: Core Interfaces and Model
- Define the
ContentMediator
interface - Create the capability profiles system
- Define the pipeline architecture interfaces
- Design the content query and operation models
- Define the caching interface
Phase 2: Base Implementation
- Implement the
ContentMediatorImpl
class - Create standard capability profiles
- Implement basic content resolution logic
- Add stream registration and management
- Implement memory-based cache strategy
Phase 3: Stream Adaptation
- Create adapters for existing stream implementations
- Implement capability mapping
- Add caching mechanism
- Implement operation routing logic
- Add stream health monitoring
Phase 4: React Integration
- Create React hooks for content queries
- Implement content observation hooks
- Add optimized components for content rendering
- Create developer utilities for debug and monitoring
- Add progressive loading components
Phase 5: Migration and Documentation
- Update existing code to use the new architecture
- Create migration guides for custom stream implementations
- Add comprehensive documentation
- Develop examples for common use cases
- Add monitoring and debugging tools
Required Changes to Existing Systems
ContentStream Interface:
- No changes required to the core interface
- Existing implementations remain compatible
- New adapter layer will bridge between streams and mediator
Stream Implementation Classes:
- Need adapters to expose enhanced capabilities
- No internal changes required
Error Handling:
- New error types for mediation and pipeline failures
- Consistent error propagation through the pipeline
Resource Management:
- Enhanced subscription tracking across the mediator
- Proper cleanup of resources when mediator is disposed
React Components:
- Gradual migration to the new query-based hooks
- Legacy support for existing components
Build Configuration:
- No changes required
Advanced Capabilities in Practice
With the decoupled capability model, components can adapt based on the actual capabilities available:
function EditButton({ documentUri }) {
const { capabilities } = useContentCapabilities({ uri: documentUri })
// Only show edit button if content is writable
if (capabilities.write === CapabilityLevel.NONE) {
return null
}
// Render button with appropriate UI based on capability level
return (
<Button
disabled={capabilities.write === CapabilityLevel.BASIC}
tooltip={getCapabilityTooltip(capabilities.write)}
onClick={handleEdit}
>
Edit
</Button>
)
}
Components can also adapt their UI based on available features:
function ContentEditor({ documentUri }) {
const { capabilities } = useContentCapabilities({ uri: documentUri })
return (
<EditorLayout>
<TextEditor />
{capabilities.features.versionHistory && <VersionHistoryPanel />}
{capabilities.features.metadataSupport && <MetadataEditor />}
{capabilities.features.schemaValidation && <SchemaValidator />}
<SaveButton disabled={capabilities.write === CapabilityLevel.NONE} />
</EditorLayout>
)
}
React Integration Example
// Define common capability profiles
const readOnlyProfile = {
id: 'read-only',
name: 'Read Only',
description: 'Can read but not modify content',
capabilities: {
read: CapabilityLevel.FULL,
write: CapabilityLevel.NONE,
delete: CapabilityLevel.NONE,
list: CapabilityLevel.FULL,
watch: CapabilityLevel.BASIC,
features: {
concurrentOperations: true,
transactionalOperations: false,
// Other features...
}
}
};
const fullCrudProfile = {
id: 'full-crud',
name: 'Full CRUD',
description: 'Full create, read, update, delete capabilities',
capabilities: {
read: CapabilityLevel.FULL,
write: CapabilityLevel.FULL,
delete: CapabilityLevel.FULL,
list: CapabilityLevel.FULL,
watch: CapabilityLevel.FULL,
features: {
concurrentOperations: true,
transactionalOperations: true,
// Other features...
}
}
};
// Register profiles with mediator
mediator.registerCapabilityProfile(readOnlyProfile);
mediator.registerCapabilityProfile(fullCrudProfile);
// Register streams with appropriate profiles
mediator.registerStream(filesystemStream, {
namespaces: ['fs'],
capabilityProfiles: ['full-crud'],
priority: 10
});
mediator.registerStream(remoteApiStream, {
namespaces: ['api'],
capabilityProfiles: ['read-only'],
priority: 5
});
// Create content hooks with the mediator
const useContent = createContentHook(mediator);
// Component using the content hook
function BlogPost({ slug }) {
const { content, loading, error } = useContent({
uri: `fs:/blog/posts/${slug}.mdx`,
options: {
transformations: ['mdx', 'syntax-highlight'],
observe: true,
refresh: 'if-stale',
progressive: {
enabled: true,
initialChunkSize: 1000
}
}
});
if (loading) return <Loading />;
if (error) return <ErrorDisplay error={error} />;
return <MDXRenderer content={content} />;
}
Targeted Framework Concerns
In developing this architecture for our specific framework, we need to address these additional concerns:
RxJS Integration: Ensure all patterns properly work with RxJS operators and observable patterns, especially with subscription management and resource disposal.
Bundle Size Optimization: Carefully structure the implementation to enable effective tree-shaking, particularly for capabilities that might not be needed in all deployments.
Server vs. Client Capabilities: When running in server environments (SSR), certain capabilities like watching may not be available or may behave differently.
TypeScript Type Safety: Leverage TypeScript’s type system to make capability checks safer and provide better developer experience with autocomplete and type errors.
Performance Monitoring: Add performance tracking to identify bottlenecks in the mediator and pipeline, especially for large content operations.
Debugging Support: Provide rich debugging tools specific to our framework for tracing content flow through the system.
Conclusion
The Content Orchestration Architecture provides a clean, flexible foundation for coordinating multiple content streams while maintaining a reactive programming model. By separating coordination from implementation and using a pipeline-based processing model, we can support a wide range of content sources and transformations with clear separation of concerns.
This approach extends the Reactive Content Streams architecture from ADR-004 with better orchestration capabilities while maintaining compatibility with existing stream implementations. The decoupled capability system ensures that components can adapt to the actual capabilities available, regardless of source type.