Skip to content

ADR-005: Content Orchestration Architecture

Status

Superseded by ADR-007 | Proposed | Accepted | Extends ADR-004

Version History

  • v1: 2025-04-28 - Initial proposal
  • v2: 2025-04-29 - Updated with additional architecture components and improvements
  • v3: 2025-04-29 - Refined with capability decoupling and updated integration patterns

Context

We have successfully implemented the Reactive Content Streams architecture outlined in ADR-004, which provides a robust foundation for content handling through RxJS-based observables. However, we now face the challenge of coordinating multiple content streams from different sources (filesystem, memory, remote, etc.) while maintaining a clean and intuitive API.

Current limitations with our streams implementation include:

  1. Coordination Complexity: There’s no standardized way to coordinate between multiple streams for content resolution and operation routing.

  2. Capability Interface Issues: The current boolean-based capability system is too simplistic for expressing nuanced stream capabilities. Additionally, capabilities are tightly coupled to source types, even though they are conceptually distinct (e.g., an HTTP source could be read-only or fully CRUD-capable).

  3. Architectural Ambiguity: It’s unclear whether the content coordination layer should implement the stream interface, leading to potential design leakage.

  4. Resource Management Challenges: Managing subscriptions and resource cleanup across multiple streams requires careful coordination.

  5. Caching Strategy: There’s no unified approach to caching content from various streams.

Our previous solution in the legacy codebase used a Registry pattern with multiple registry implementations. While this approach worked, it led to interface duplication, unclear responsibility boundaries, and complex capability detection logic in components.

Decision

Implement a Content Orchestration Architecture with these core components:

  1. ContentMediator Pattern: Use a mediator that coordinates between streams without implementing the stream interface itself, maintaining clear separation of concerns.

  2. Decoupled Capability Model: Separate capabilities from source types using a capability profile system that allows flexible capability expression.

  3. Pipeline Architecture: Implement a content processing pipeline that separates resolution, fetching, transformation, and caching into distinct composable stages.

  4. Declarative Query Model: Use a reactive query model for content requests instead of imperative method calls.

  5. Pluggable Caching Strategy: Implement a configurable caching layer to optimize performance.

  6. Health Monitoring System: Add monitoring for stream health and reliability.

The complete architecture can be visualized as:

+------------------+     +---------------------+     +-------------------+
|   React Components   |     |    Content Hooks     |     |    Static Assets    |
+------------------+     +---------------------+     +-------------------+
         |                          |                         |
         v                          v                         v
+----------------------------------------------------------------------+
|                       Content Query Interface                         |
|                                                                      |
|   +-----------------------------------------------------------+      |
|   |                    Content Mediator                       |      |
|   |                                                           |      |
|   |   +----------------+   +-------------------+              |      |
|   |   | Stream Registry |   | Capability Profiles |              |      |
|   |   +----------------+   +-------------------+              |      |
|   |                                                           |      |
|   |   +----------------+   +-------------------+              |      |
|   |   | Resolution Cache|   |  Content Cache    |              |      |
|   |   +----------------+   +-------------------+              |      |
|   +-----------------------------------------------------------+      |
|                                                                      |
|   +-----------------------------------------------------------+      |
|   |                    Content Pipeline                       |      |
|   |                                                           |      |
|   |   +-----------+   +--------+   +-------------+   +------+ |      |
|   |   | Resolvers |-->|Fetchers|-->| Transformers |--> | Enrichers | |      |
|   |   +-----------+   +--------+   +-------------+   +------+ |      |
|   +-----------------------------------------------------------+      |
+----------------------------------------------------------------------+
         |                |                 |                |
         v                v                 v                v
+-------------+    +--------------+    +-------------+    +-------------+
| Filesystem  |    | Memory       |    | Remote      |    | Custom      |
| Stream      |    | Stream       |    | Stream      |    | Streams     |
+-------------+    +--------------+    +-------------+    +-------------+
      |                  |                   |                  |
      v                  v                   v                  v
+-------------+    +--------------+    +-------------+    +-------------+
| Local Files |    | In-Memory    |    | Remote APIs |    | Custom      |
| & Assets    |    | Content      |    | & Services  |    | Sources     |
+-------------+    +--------------+    +-------------+    +-------------+

Core Components

1. Content Mediator

typescript
export interface ContentMediator<T = RawContent> extends Disposable {
  // Stream registration with capability profiles
  registerStream(
    stream: ContentStream<T>,
    options: StreamRegistrationOptions
  ): string
  unregisterStream(streamId: string): void
  getRegisteredStreams(): ReadonlyMap<string, StreamInfo<T>>

  // Capability profiles management
  registerCapabilityProfile(profile: CapabilityProfile): void
  getCapabilityProfile(profileId: string): CapabilityProfile | null
  listCapabilityProfiles(): CapabilityProfile[]

  // Content operations - note these don't mirror the ContentStream interface
  resolveContent(query: ContentQuery): Observable<ResolvedContent<T>>
  executeContentOperation(
    operation: ContentOperation
  ): Observable<OperationResult<T>>

  // Content observation
  observeContent(query: ContentQuery): Observable<ResolvedContent<T>>
  observeChanges(pattern?: string): Observable<ContentChange<T>>

  // Capability management
  getCapabilitiesForQuery(query: ContentQuery): ContentCapabilities
  getEffectiveCapabilities(streamId: string): ContentCapabilities

  // Health monitoring
  getStreamHealth(streamId?: string): StreamHealth
  observeStreamHealth(): Observable<StreamHealthUpdate>
}

2. Decoupled Capability Model

typescript
/**
 * Capability levels for different operations
 */
export enum CapabilityLevel {
  NONE = 'none',
  BASIC = 'basic',
  PARTIAL = 'partial',
  FULL = 'full',
}

/**
 * Content capabilities interface
 */
export interface ContentCapabilities {
  read: CapabilityLevel
  write: CapabilityLevel
  delete: CapabilityLevel
  list: CapabilityLevel
  watch: CapabilityLevel

  // Additional capability metadata
  features: {
    concurrentOperations: boolean
    transactionalOperations: boolean
    versionHistory: boolean
    metadataSupport: boolean
    fullTextSearch: boolean
    changeTracking: boolean
    schemaValidation: boolean
  }

  // Resource limits
  limits?: {
    maxFileSize?: number
    maxOperationsPerSecond?: number
    maxConcurrentOperations?: number
  }
}

/**
 * Reusable capability profile
 */
export interface CapabilityProfile {
  id: string
  name: string
  description: string
  capabilities: ContentCapabilities
}

/**
 * Stream registration options
 */
export interface StreamRegistrationOptions {
  id?: string
  namespaces: string[]
  priority?: number
  capabilityProfiles: string[]
  capabilityOverrides?: Partial<ContentCapabilities>
  matcher?: (uri: string) => boolean
}

3. Pipeline Architecture

typescript
export interface ContentPipeline<T = RawContent> {
  // Pipeline stages
  resolvers: PipelineStage<ContentQuery, string[]>[]
  fetchers: PipelineStage<string, T>[]
  transformers: PipelineStage<T, ProcessedContent>[]
  enrichers: PipelineStage<ProcessedContent, ProcessedContent>[]

  // Pipeline execution
  execute(input: ContentQuery): Observable<ProcessedContent>

  // Pipeline configuration
  addResolver(
    resolver: PipelineStage<ContentQuery, string[]>,
    priority: number
  ): this
  addFetcher(fetcher: PipelineStage<string, T>, priority: number): this
  addTransformer(
    transformer: PipelineStage<T, ProcessedContent>,
    priority: number
  ): this
  addEnricher(
    enricher: PipelineStage<ProcessedContent, ProcessedContent>,
    priority: number
  ): this
}

export interface PipelineStage<TInput, TOutput> {
  execute(input: TInput, context: PipelineContext): Observable<TOutput>
  canHandle(input: TInput, context: PipelineContext): boolean
  priority: number
  name: string
}

export interface PipelineContext {
  // Contextual information for pipeline execution
  operation: ContentOperation | ContentQuery
  timestamp: number
  executionId: string

  // Shared state for pipeline stages
  state: Map<string, unknown>

  // Performance metrics
  metrics: {
    stageDurations: Record<string, number>
    totalDuration?: number
    startTime: number
  }

  // Tracing and debugging
  traces: PipelineTrace[]
  debug: boolean
}

4. Declarative Query Model

typescript
export interface ContentQuery {
  // URI for content resolution
  uri: string

  // Query options
  options?: {
    // Refresh strategy
    refresh?: 'always' | 'if-stale' | 'never'

    // Caching strategy
    cache?: boolean

    // Transformation pipeline to apply
    transformations?: string[]

    // Whether to continue observing content after initial fetch
    observe?: boolean

    // Stream selector (bypass resolution)
    sourceStrategy?: {
      type: 'specific' | 'capability-based' | 'auto'
      streamId?: string
      requiredCapabilities?: Partial<ContentCapabilities>
    }

    // Progressive loading options
    progressive?: {
      enabled: boolean
      initialChunkSize?: number
      subsequentChunkSize?: number
    }

    // Additional stream-specific options
    [key: string]: unknown
  }
}

export interface ContentOperation {
  // Operation type
  type: 'read' | 'write' | 'delete' | 'list'

  // Operation target
  target: string

  // Operation-specific data
  data?: unknown

  // Operation options
  options?: {
    // Operation timeout
    timeout?: number

    // Stream selector (bypass resolution)
    streamId?: string

    // Whether to update cache after operation
    updateCache?: boolean

    // Additional operation-specific options
    [key: string]: unknown
  }
}

5. Pluggable Caching System

typescript
export interface ContentCache<T> extends Disposable {
  // Cache operations
  get(key: string): Observable<CacheEntry<T> | null>
  set(key: string, value: T, options?: CacheOptions): Observable<void>
  invalidate(keyPattern: string): Observable<number>
  clear(): Observable<void>

  // Cache information
  getStats(): CacheStats
  observeStats(): Observable<CacheStats>

  // Cache configuration
  configure(options: CacheConfiguration): void
}

export interface CacheEntry<T> {
  value: T
  timestamp: number
  metadata?: Record<string, unknown>
}

export interface CacheOptions {
  ttl?: number
  priority?: number
  tags?: string[]
  refresh?: 'lazy' | 'eager'
}

export interface CacheConfiguration {
  maxEntries?: number
  defaultTtl?: number
  evictionPolicy?: 'lru' | 'lfu' | 'fifo' | 'custom'
  storage?: 'memory' | 'localStorage' | 'indexedDB' | 'custom'
}

6. Stream Health Monitoring

typescript
export interface StreamHealth {
  status: 'healthy' | 'degraded' | 'failing' | 'disconnected'
  metrics: {
    availability: number // 0-1
    latency: number // ms
    errorRate: number // 0-1
    throughput: number // ops/sec
  }
  lastError?: Error
  lastChecked: number
}

export interface StreamHealthUpdate {
  streamId: string
  health: StreamHealth
  previousStatus?: 'healthy' | 'degraded' | 'failing' | 'disconnected'
  timestamp: number
}

Alternatives Considered

  1. Enhanced Registry Pattern (Extension of ADR-003)

    • Pros: More familiar pattern, less architectural change
    • Cons: Interface duplication, unclear responsibility boundaries
    • Outcome: Rejected due to architectural concerns about separation of concerns
  2. Content Microservices

    • Pros: Clear boundaries, independent scaling
    • Cons: Unnecessary complexity for a frontend library
    • Outcome: Rejected as overly complex
  3. Stream Implementation Inheritance

    • Pros: Unified interface, consistent API
    • Cons: Forces orchestration layer to implement low-level stream methods
    • Outcome: Rejected due to violation of single responsibility principle
  4. Event Sourcing Architecture

    • Pros: Strong audit capabilities, time-travel debugging
    • Cons: Significant complexity, overkill for content system
    • Outcome: Partially adopted concepts for change tracking, rejected as primary architecture
  5. GraphQL-style Query System

    • Pros: Powerful declarative queries, field selection
    • Cons: Implementation complexity, learning curve
    • Outcome: Adapted concepts for the content query model, but rejected full GraphQL implementation

Consequences

Benefits

  1. Clear Separation of Concerns: The mediator coordinates between streams without implementing the stream interface, maintaining architectural clarity.

  2. Decoupled Capabilities: Separating capabilities from source types allows for more flexible and accurate capability expression.

  3. Consistent Stream Selection: The mediator provides a standard mechanism for selecting appropriate streams based on capabilities, URIs, and other criteria.

  4. Flexible Pipeline Architecture: Processing pipeline enables clean separation of concerns between resolution, fetching, and transformation.

  5. Intuitive Developer Experience: Declarative query model provides a clean, reactive API for content operations.

  6. Better Testability: Components can be tested in isolation with clear boundaries.

  7. Enhanced Caching: Pluggable caching strategy with configurable policies and storage backends.

  8. Future Extensibility: Architecture supports adding new stream types, content processors, and pipeline stages without modifying existing code.

  9. Progressive Loading: Support for large content through progressive loading strategies.

  10. Health Monitoring: Automatic detection and recovery from failing streams.

Challenges

  1. Implementation Complexity: More sophisticated architecture requires careful implementation.

  2. Learning Curve: Developers need to understand the mediator pattern and pipeline concepts.

  3. Migration Effort: Existing code needs to be adapted to the new architecture.

  4. Performance Considerations: Pipeline stages add computational overhead that must be managed.

  5. Increased Abstraction: More layers of abstraction could make debugging more difficult.

  6. Cache Consistency: Maintaining consistency between caches and sources requires careful handling.

Implementation Strategy

The implementation will follow a phased approach to minimize disruption:

Phase 1: Core Interfaces and Model

  1. Define the ContentMediator interface
  2. Create the capability profiles system
  3. Define the pipeline architecture interfaces
  4. Design the content query and operation models
  5. Define the caching interface

Phase 2: Base Implementation

  1. Implement the ContentMediatorImpl class
  2. Create standard capability profiles
  3. Implement basic content resolution logic
  4. Add stream registration and management
  5. Implement memory-based cache strategy

Phase 3: Stream Adaptation

  1. Create adapters for existing stream implementations
  2. Implement capability mapping
  3. Add caching mechanism
  4. Implement operation routing logic
  5. Add stream health monitoring

Phase 4: React Integration

  1. Create React hooks for content queries
  2. Implement content observation hooks
  3. Add optimized components for content rendering
  4. Create developer utilities for debug and monitoring
  5. Add progressive loading components

Phase 5: Migration and Documentation

  1. Update existing code to use the new architecture
  2. Create migration guides for custom stream implementations
  3. Add comprehensive documentation
  4. Develop examples for common use cases
  5. Add monitoring and debugging tools

Required Changes to Existing Systems

  1. ContentStream Interface:

    • No changes required to the core interface
    • Existing implementations remain compatible
    • New adapter layer will bridge between streams and mediator
  2. Stream Implementation Classes:

    • Need adapters to expose enhanced capabilities
    • No internal changes required
  3. Error Handling:

    • New error types for mediation and pipeline failures
    • Consistent error propagation through the pipeline
  4. Resource Management:

    • Enhanced subscription tracking across the mediator
    • Proper cleanup of resources when mediator is disposed
  5. React Components:

    • Gradual migration to the new query-based hooks
    • Legacy support for existing components
  6. Build Configuration:

    • No changes required

Advanced Capabilities in Practice

With the decoupled capability model, components can adapt based on the actual capabilities available:

jsx
function EditButton({ documentUri }) {
  const { capabilities } = useContentCapabilities({ uri: documentUri })

  // Only show edit button if content is writable
  if (capabilities.write === CapabilityLevel.NONE) {
    return null
  }

  // Render button with appropriate UI based on capability level
  return (
    <Button
      disabled={capabilities.write === CapabilityLevel.BASIC}
      tooltip={getCapabilityTooltip(capabilities.write)}
      onClick={handleEdit}
    >
      Edit
    </Button>
  )
}

Components can also adapt their UI based on available features:

jsx
function ContentEditor({ documentUri }) {
  const { capabilities } = useContentCapabilities({ uri: documentUri })

  return (
    <EditorLayout>
      <TextEditor />

      {capabilities.features.versionHistory && <VersionHistoryPanel />}

      {capabilities.features.metadataSupport && <MetadataEditor />}

      {capabilities.features.schemaValidation && <SchemaValidator />}

      <SaveButton disabled={capabilities.write === CapabilityLevel.NONE} />
    </EditorLayout>
  )
}

React Integration Example

typescript
// Define common capability profiles
const readOnlyProfile = {
  id: 'read-only',
  name: 'Read Only',
  description: 'Can read but not modify content',
  capabilities: {
    read: CapabilityLevel.FULL,
    write: CapabilityLevel.NONE,
    delete: CapabilityLevel.NONE,
    list: CapabilityLevel.FULL,
    watch: CapabilityLevel.BASIC,
    features: {
      concurrentOperations: true,
      transactionalOperations: false,
      // Other features...
    }
  }
};

const fullCrudProfile = {
  id: 'full-crud',
  name: 'Full CRUD',
  description: 'Full create, read, update, delete capabilities',
  capabilities: {
    read: CapabilityLevel.FULL,
    write: CapabilityLevel.FULL,
    delete: CapabilityLevel.FULL,
    list: CapabilityLevel.FULL,
    watch: CapabilityLevel.FULL,
    features: {
      concurrentOperations: true,
      transactionalOperations: true,
      // Other features...
    }
  }
};

// Register profiles with mediator
mediator.registerCapabilityProfile(readOnlyProfile);
mediator.registerCapabilityProfile(fullCrudProfile);

// Register streams with appropriate profiles
mediator.registerStream(filesystemStream, {
  namespaces: ['fs'],
  capabilityProfiles: ['full-crud'],
  priority: 10
});

mediator.registerStream(remoteApiStream, {
  namespaces: ['api'],
  capabilityProfiles: ['read-only'],
  priority: 5
});

// Create content hooks with the mediator
const useContent = createContentHook(mediator);

// Component using the content hook
function BlogPost({ slug }) {
  const { content, loading, error } = useContent({
    uri: `fs:/blog/posts/${slug}.mdx`,
    options: {
      transformations: ['mdx', 'syntax-highlight'],
      observe: true,
      refresh: 'if-stale',
      progressive: {
        enabled: true,
        initialChunkSize: 1000
      }
    }
  });

  if (loading) return <Loading />;
  if (error) return <ErrorDisplay error={error} />;

  return <MDXRenderer content={content} />;
}

Targeted Framework Concerns

In developing this architecture for our specific framework, we need to address these additional concerns:

  1. RxJS Integration: Ensure all patterns properly work with RxJS operators and observable patterns, especially with subscription management and resource disposal.

  2. Bundle Size Optimization: Carefully structure the implementation to enable effective tree-shaking, particularly for capabilities that might not be needed in all deployments.

  3. Server vs. Client Capabilities: When running in server environments (SSR), certain capabilities like watching may not be available or may behave differently.

  4. TypeScript Type Safety: Leverage TypeScript’s type system to make capability checks safer and provide better developer experience with autocomplete and type errors.

  5. Performance Monitoring: Add performance tracking to identify bottlenecks in the mediator and pipeline, especially for large content operations.

  6. Debugging Support: Provide rich debugging tools specific to our framework for tracing content flow through the system.

Conclusion

The Content Orchestration Architecture provides a clean, flexible foundation for coordinating multiple content streams while maintaining a reactive programming model. By separating coordination from implementation and using a pipeline-based processing model, we can support a wide range of content sources and transformations with clear separation of concerns.

This approach extends the Reactive Content Streams architecture from ADR-004 with better orchestration capabilities while maintaining compatibility with existing stream implementations. The decoupled capability system ensures that components can adapt to the actual capabilities available, regardless of source type.

References

Released under the MIT License.