ADR-004: Reactive Content Streams
Status
Superseded by ADR-007 | Accepted | Implemented | Deprecates ADR-003
Version History
- v1: 2025-04-24 - Initial proposal
- v2: 2025-04-25 - Updated with RxJS implementation details
Context
Our content system has evolved to require more reactive, real-time capabilities to support modern development workflows and content authoring experiences. While ADR-003 introduced a Registry Facade pattern, it still maintained a separation between static and dynamic content sources, with static content being explicitly registered and dynamic content being loaded through adapters.
Current limitations include:
Separation of Static and Dynamic Content: The distinction between StaticRegistry and DynamicRegistry creates artificial boundaries that complicate the codebase.
Limited Reactivity: The current watching mechanism is focused on content changes, not on content additions, removals, or relocations.
Development Mode Limitations: The system doesn’t detect new files added during development without manual registration.
Conceptual Complexity: Different registry types with different capabilities make the system harder to reason about.
Future Extensibility Concerns: Supporting additional content sources like WebDAV, Firebase, or WebRTC would require significant changes to the existing architecture.
Furthermore, we want to position the system to support content authoring tools directly in the application, which requires first-class support for content creation, deletion, and full CRUD operations through a unified interface.
Decision
Implement a Reactive Content Streams architecture that fundamentally treats all content as observable streams, regardless of source:
React Components → ContentStream → (Processor | Adapter)
Key architecture components:
Universal Content Stream Interface: Create a core
ContentStream<T>
interface that abstracts away the source of content and provides uniform reactivity.RxJS Foundation: Use RxJS as the core reactive programming library to provide a robust, well-tested foundation for our streaming architecture.
Stream Implementations: Create specific implementations for different content sources (filesystem, remote, memory) while maintaining a consistent reactive interface.
Single Registry Architecture: Replace the static/dynamic registry distinction with a single registry that works with content streams, abstracting away source-specific details.
Uniform CRUD Operations: Support consistent create, read, update, and delete operations across all content sources through the stream interface.
Build-Time Optimization: During build, generate a content manifest that can be loaded at runtime, giving the appearance of dynamic content while maintaining performance benefits.
Alternatives Considered
Enhanced Registry Facade (ADR-003):
- Pros: Less disruptive, builds on existing work
- Cons: Still maintains artificial static/dynamic distinction, less suitable for true reactive programming
- Outcome: Rejected in favor of a more fundamental architectural shift
Event-Based Content System:
- Pros: Simpler implementation, less conceptual overhead
- Cons: Less powerful than streams, harder to compose operations
- Outcome: Rejected because reactive streams provide better composition and transformation capabilities
GraphQL-Like Content API:
- Pros: Declarative query model, flexible data fetching
- Cons: Overkill for content needs, adds complexity
- Outcome: Rejected as unnecessary for our current requirements
Virtual File System Abstraction:
- Pros: Familiar file system metaphor, easy to understand
- Cons: Limited reactivity, doesn’t fit well with all content sources
- Outcome: Partially adopted concepts but rejected as the primary abstraction
Custom Observable Implementation:
- Pros: Tailored exactly to our needs, potentially smaller bundle size
- Cons: Significant development effort, edge cases to handle, testing complexity
- Outcome: Rejected in favor of using RxJS as a mature, well-tested solution
Consequences
Benefits
- Unified Content Model: All content sources are treated equally through the stream interface
- First-Class Reactivity: The system is built from the ground up to support reactive patterns
- Enhanced Development Experience: Automatically detects new files and content changes
- Future Extensibility: New content sources can be added by implementing the stream interface
- Simplified Mental Model: Developers work with a single consistent pattern regardless of content source
- Composition Support: Streams can be combined, transformed, and composed for complex operations
- Proven Foundation: RxJS provides a mature reactive programming model with excellent TypeScript support
- Rich Operator Ecosystem: RxJS operators provide powerful data transformation capabilities
- Testability: RxJS includes tools for testing asynchronous streams (marble testing)
Challenges
- Migration Complexity: Significant architectural changes required
- Learning Curve: Reactive programming concepts and RxJS patterns may be unfamiliar
- RxJS Bundle Size: Potential increase in bundle size (mitigated with tree shaking)
- Debugging Complexity: Stream operations can be harder to debug than imperative code
- Performance Monitoring: Need to ensure stream operations remain efficient
- Browser Compatibility: Some stream implementations may require polyfills for older browsers
Implementation
Core Dependencies
- RxJS: Core reactive programming library
- chokidar: File system watching
- glob: File pattern matching
- nanoid: ID generation for content
Core Stream Interface
import { Observable } from 'rxjs'
export interface ContentStream<T = RawContent> {
// Core operations
read(path: string): Observable<T>
write(path: string, content: T): Observable<void>
delete(path: string): Observable<void>
// Listing and observation
list(pattern?: string): Observable<string[]>
changes(path?: string): Observable<ContentChange<T>>
// Capabilities
readonly capabilities: StreamCapabilities
}
export interface StreamCapabilities {
canRead: boolean
canWrite: boolean
canDelete: boolean
canList: boolean
canWatch: boolean
supportsCRUD: boolean
}
export interface ContentChange<T> {
type: 'created' | 'updated' | 'deleted'
path: string
content?: T
timestamp: number
}
export interface RawContent {
content: string
contentType: string
metadata?: Record<string, unknown>
}
Filesystem Stream Implementation
import { Observable, Subject, from, fromEvent, merge } from 'rxjs'
import { map, filter, catchError, tap, switchMap } from 'rxjs/operators'
import * as chokidar from 'chokidar'
import * as fs from 'fs/promises'
import { glob } from 'glob'
import {
ContentStream,
RawContent,
ContentChange,
StreamCapabilities,
} from './types'
import { createLogger } from '@lib/telemetry'
export class FilesystemContentStream implements ContentStream<RawContent> {
private watcher: chokidar.FSWatcher
private changeSubject = new Subject<ContentChange<RawContent>>()
private logger = createLogger('content:stream:filesystem')
readonly capabilities: StreamCapabilities = {
canRead: true,
canWrite: true,
canDelete: true,
canList: true,
canWatch: true,
supportsCRUD: true,
}
constructor(private rootDir: string) {
this.setupWatcher()
}
private setupWatcher(): void {
this.watcher = chokidar.watch(this.rootDir, {
ignored: /(^|[\/\\])\../, // ignore dotfiles
persistent: true,
})
// Setup file watching events
this.watcher.on('add', path => this.handleFileAdded(path))
this.watcher.on('change', path => this.handleFileChanged(path))
this.watcher.on('unlink', path => this.handleFileRemoved(path))
}
// Implementation details...
}
React Integration
import { useState, useEffect } from 'react'
import { Observable } from 'rxjs'
import { ContentStream, RawContent } from '@lib/content/streams/types'
export function useContent<T = RawContent>(
stream: ContentStream<T>,
path: string
) {
const [content, setContent] = useState<T | null>(null)
const [loading, setLoading] = useState(true)
const [error, setError] = useState<Error | null>(null)
useEffect(() => {
const subscription = stream.read(path).subscribe({
next: data => {
setContent(data)
setLoading(false)
},
error: err => {
setError(err)
setLoading(false)
},
complete: () => {
setLoading(false)
},
})
return () => subscription.unsubscribe()
}, [stream, path])
return { content, loading, error }
}
Registry Implementation
import { Observable, throwError, of } from 'rxjs'
import { switchMap, catchError, shareReplay } from 'rxjs/operators'
import { ContentStream, RawContent } from '@lib/content/streams/types'
import { MdxProcessor } from '@lib/content/processors'
export class ContentRegistry {
private streams = new Map<string, ContentStream>()
private processor: MdxProcessor
constructor() {
this.processor = new MdxProcessor()
}
registerStream(namespace: string, stream: ContentStream): void {
this.streams.set(namespace, stream)
}
getContent(uri: string): Observable<ProcessedContent> {
const [namespace, path] = this.parseUri(uri)
const stream = this.streams.get(namespace)
if (!stream) {
return throwError(() => new Error(`Unknown namespace: ${namespace}`))
}
return stream.read(path).pipe(
switchMap(content => this.processor.process(content)),
// Share replay allows multiple subscribers to receive the same content
// without triggering multiple loads
shareReplay(1),
catchError(err => {
console.error(`Error processing content at ${uri}:`, err)
return throwError(
() => new Error(`Failed to process content: ${err.message}`)
)
})
)
}
// Other implementation details...
}
Phased Implementation Plan
Phase 1: Core Infrastructure
- Install and configure RxJS ✅
- Define core interfaces
- Implement basic stream implementations
- Create testing utilities
Phase 2: React Integration
- Build React hooks for content access
- Update content components to use streams
- Implement error boundaries and loading states
- Add testing infrastructure
Phase 3: Build System Integration
- Create content manifest plugin
- Implement HMR with reactive streams
- Add build-time optimizations
- Develop debugging tools
Phase 4: Extended Features
- Add remote content streams
- Implement collaborative features
- Create content authoring tools
- Add advanced content operations
Testing Strategy
- Use rxjs-marbles for marble testing of streams
- Implement TestScheduler for synchronous testing of async code
- Create mock streams for isolated component testing
- Add integration tests for full content flow