Skip to content

ADR-004: Reactive Content Streams

Status

Superseded by ADR-007 | Accepted | Implemented | Deprecates ADR-003

Version History

  • v1: 2025-04-24 - Initial proposal
  • v2: 2025-04-25 - Updated with RxJS implementation details

Context

Our content system has evolved to require more reactive, real-time capabilities to support modern development workflows and content authoring experiences. While ADR-003 introduced a Registry Facade pattern, it still maintained a separation between static and dynamic content sources, with static content being explicitly registered and dynamic content being loaded through adapters.

Current limitations include:

  1. Separation of Static and Dynamic Content: The distinction between StaticRegistry and DynamicRegistry creates artificial boundaries that complicate the codebase.

  2. Limited Reactivity: The current watching mechanism is focused on content changes, not on content additions, removals, or relocations.

  3. Development Mode Limitations: The system doesn’t detect new files added during development without manual registration.

  4. Conceptual Complexity: Different registry types with different capabilities make the system harder to reason about.

  5. Future Extensibility Concerns: Supporting additional content sources like WebDAV, Firebase, or WebRTC would require significant changes to the existing architecture.

Furthermore, we want to position the system to support content authoring tools directly in the application, which requires first-class support for content creation, deletion, and full CRUD operations through a unified interface.

Decision

Implement a Reactive Content Streams architecture that fundamentally treats all content as observable streams, regardless of source:

React Components → ContentStream → (Processor | Adapter)

Key architecture components:

  1. Universal Content Stream Interface: Create a core ContentStream<T> interface that abstracts away the source of content and provides uniform reactivity.

  2. RxJS Foundation: Use RxJS as the core reactive programming library to provide a robust, well-tested foundation for our streaming architecture.

  3. Stream Implementations: Create specific implementations for different content sources (filesystem, remote, memory) while maintaining a consistent reactive interface.

  4. Single Registry Architecture: Replace the static/dynamic registry distinction with a single registry that works with content streams, abstracting away source-specific details.

  5. Uniform CRUD Operations: Support consistent create, read, update, and delete operations across all content sources through the stream interface.

  6. Build-Time Optimization: During build, generate a content manifest that can be loaded at runtime, giving the appearance of dynamic content while maintaining performance benefits.

Alternatives Considered

  1. Enhanced Registry Facade (ADR-003):

    • Pros: Less disruptive, builds on existing work
    • Cons: Still maintains artificial static/dynamic distinction, less suitable for true reactive programming
    • Outcome: Rejected in favor of a more fundamental architectural shift
  2. Event-Based Content System:

    • Pros: Simpler implementation, less conceptual overhead
    • Cons: Less powerful than streams, harder to compose operations
    • Outcome: Rejected because reactive streams provide better composition and transformation capabilities
  3. GraphQL-Like Content API:

    • Pros: Declarative query model, flexible data fetching
    • Cons: Overkill for content needs, adds complexity
    • Outcome: Rejected as unnecessary for our current requirements
  4. Virtual File System Abstraction:

    • Pros: Familiar file system metaphor, easy to understand
    • Cons: Limited reactivity, doesn’t fit well with all content sources
    • Outcome: Partially adopted concepts but rejected as the primary abstraction
  5. Custom Observable Implementation:

    • Pros: Tailored exactly to our needs, potentially smaller bundle size
    • Cons: Significant development effort, edge cases to handle, testing complexity
    • Outcome: Rejected in favor of using RxJS as a mature, well-tested solution

Consequences

Benefits

  1. Unified Content Model: All content sources are treated equally through the stream interface
  2. First-Class Reactivity: The system is built from the ground up to support reactive patterns
  3. Enhanced Development Experience: Automatically detects new files and content changes
  4. Future Extensibility: New content sources can be added by implementing the stream interface
  5. Simplified Mental Model: Developers work with a single consistent pattern regardless of content source
  6. Composition Support: Streams can be combined, transformed, and composed for complex operations
  7. Proven Foundation: RxJS provides a mature reactive programming model with excellent TypeScript support
  8. Rich Operator Ecosystem: RxJS operators provide powerful data transformation capabilities
  9. Testability: RxJS includes tools for testing asynchronous streams (marble testing)

Challenges

  1. Migration Complexity: Significant architectural changes required
  2. Learning Curve: Reactive programming concepts and RxJS patterns may be unfamiliar
  3. RxJS Bundle Size: Potential increase in bundle size (mitigated with tree shaking)
  4. Debugging Complexity: Stream operations can be harder to debug than imperative code
  5. Performance Monitoring: Need to ensure stream operations remain efficient
  6. Browser Compatibility: Some stream implementations may require polyfills for older browsers

Implementation

Core Dependencies

  • RxJS: Core reactive programming library
  • chokidar: File system watching
  • glob: File pattern matching
  • nanoid: ID generation for content

Core Stream Interface

typescript
import { Observable } from 'rxjs'

export interface ContentStream<T = RawContent> {
  // Core operations
  read(path: string): Observable<T>
  write(path: string, content: T): Observable<void>
  delete(path: string): Observable<void>

  // Listing and observation
  list(pattern?: string): Observable<string[]>
  changes(path?: string): Observable<ContentChange<T>>

  // Capabilities
  readonly capabilities: StreamCapabilities
}

export interface StreamCapabilities {
  canRead: boolean
  canWrite: boolean
  canDelete: boolean
  canList: boolean
  canWatch: boolean
  supportsCRUD: boolean
}

export interface ContentChange<T> {
  type: 'created' | 'updated' | 'deleted'
  path: string
  content?: T
  timestamp: number
}

export interface RawContent {
  content: string
  contentType: string
  metadata?: Record<string, unknown>
}

Filesystem Stream Implementation

typescript
import { Observable, Subject, from, fromEvent, merge } from 'rxjs'
import { map, filter, catchError, tap, switchMap } from 'rxjs/operators'
import * as chokidar from 'chokidar'
import * as fs from 'fs/promises'
import { glob } from 'glob'
import {
  ContentStream,
  RawContent,
  ContentChange,
  StreamCapabilities,
} from './types'
import { createLogger } from '@lib/telemetry'

export class FilesystemContentStream implements ContentStream<RawContent> {
  private watcher: chokidar.FSWatcher
  private changeSubject = new Subject<ContentChange<RawContent>>()
  private logger = createLogger('content:stream:filesystem')

  readonly capabilities: StreamCapabilities = {
    canRead: true,
    canWrite: true,
    canDelete: true,
    canList: true,
    canWatch: true,
    supportsCRUD: true,
  }

  constructor(private rootDir: string) {
    this.setupWatcher()
  }

  private setupWatcher(): void {
    this.watcher = chokidar.watch(this.rootDir, {
      ignored: /(^|[\/\\])\../, // ignore dotfiles
      persistent: true,
    })

    // Setup file watching events
    this.watcher.on('add', path => this.handleFileAdded(path))
    this.watcher.on('change', path => this.handleFileChanged(path))
    this.watcher.on('unlink', path => this.handleFileRemoved(path))
  }

  // Implementation details...
}

React Integration

typescript
import { useState, useEffect } from 'react'
import { Observable } from 'rxjs'
import { ContentStream, RawContent } from '@lib/content/streams/types'

export function useContent<T = RawContent>(
  stream: ContentStream<T>,
  path: string
) {
  const [content, setContent] = useState<T | null>(null)
  const [loading, setLoading] = useState(true)
  const [error, setError] = useState<Error | null>(null)

  useEffect(() => {
    const subscription = stream.read(path).subscribe({
      next: data => {
        setContent(data)
        setLoading(false)
      },
      error: err => {
        setError(err)
        setLoading(false)
      },
      complete: () => {
        setLoading(false)
      },
    })

    return () => subscription.unsubscribe()
  }, [stream, path])

  return { content, loading, error }
}

Registry Implementation

typescript
import { Observable, throwError, of } from 'rxjs'
import { switchMap, catchError, shareReplay } from 'rxjs/operators'
import { ContentStream, RawContent } from '@lib/content/streams/types'
import { MdxProcessor } from '@lib/content/processors'

export class ContentRegistry {
  private streams = new Map<string, ContentStream>()
  private processor: MdxProcessor

  constructor() {
    this.processor = new MdxProcessor()
  }

  registerStream(namespace: string, stream: ContentStream): void {
    this.streams.set(namespace, stream)
  }

  getContent(uri: string): Observable<ProcessedContent> {
    const [namespace, path] = this.parseUri(uri)
    const stream = this.streams.get(namespace)

    if (!stream) {
      return throwError(() => new Error(`Unknown namespace: ${namespace}`))
    }

    return stream.read(path).pipe(
      switchMap(content => this.processor.process(content)),
      // Share replay allows multiple subscribers to receive the same content
      // without triggering multiple loads
      shareReplay(1),
      catchError(err => {
        console.error(`Error processing content at ${uri}:`, err)
        return throwError(
          () => new Error(`Failed to process content: ${err.message}`)
        )
      })
    )
  }

  // Other implementation details...
}

Phased Implementation Plan

Phase 1: Core Infrastructure

  1. Install and configure RxJS ✅
  2. Define core interfaces
  3. Implement basic stream implementations
  4. Create testing utilities

Phase 2: React Integration

  1. Build React hooks for content access
  2. Update content components to use streams
  3. Implement error boundaries and loading states
  4. Add testing infrastructure

Phase 3: Build System Integration

  1. Create content manifest plugin
  2. Implement HMR with reactive streams
  3. Add build-time optimizations
  4. Develop debugging tools

Phase 4: Extended Features

  1. Add remote content streams
  2. Implement collaborative features
  3. Create content authoring tools
  4. Add advanced content operations

Testing Strategy

  • Use rxjs-marbles for marble testing of streams
  • Implement TestScheduler for synchronous testing of async code
  • Create mock streams for isolated component testing
  • Add integration tests for full content flow

References

Released under the MIT License.