Event System
The event system is a core component of the content architecture, enabling reactive updates, synchronization, and extensibility. This document explains the event system concepts, patterns, and implementation.
Core Event Concepts
Event-Driven Architecture
The content system uses an event-driven architecture with these key characteristics:
- Loose Coupling: Components communicate through events rather than direct calls
- Reactive Updates: UI and application state update in response to events
- Observable Operations: Content operations emit events that can be observed
- Cross-Cutting Communication: Events span across system boundaries and layers
- Extensibility: Plugins and extensions integrate via event handling
Event Types
The system defines several categories of events:
Content Change Events
Triggered when content changes:
- create: Content is created or added
- update: Content is modified
- delete: Content is removed
Operation Lifecycle Events
Triggered during operation execution:
- operation:start: Operation begins execution
- operation:success: Operation completes successfully
- operation:error: Operation encounters an error
- operation:complete: Operation finishes (success or error)
External Change Events
Triggered by external systems:
- external:change: External system modifies content
- external:sync: Synchronization occurs with external system
System Events
Triggered by the system itself:
- system:ready: System is initialized and ready
- system:error: System encounters an error
- system:shutdown: System is shutting down
Event Flow
Events flow through the system in a predictable pattern:
- Event Sources: Operations, external systems, or user actions generate events
- Event Bus: Events are published to an event bus or emitter
- Event Handlers: Registered handlers receive and process events
- Side Effects: Handlers may cause state updates, UI changes, or trigger other operations
- Event Completion: Events may include acknowledgment or completion callbacks
Observer Pattern Implementation
The event system implements the observer pattern:
EventEmitter
The core event emitter interface:
interface EventEmitter {
// Event subscription
on<T = any>(event: string, handler: EventHandler<T>): Unsubscribe
once<T = any>(event: string, handler: EventHandler<T>): Unsubscribe
// Event emission
emit<T = any>(event: string, data?: T): boolean
// Subscription management
off<T = any>(event: string, handler?: EventHandler<T>): void
removeAllListeners(event?: string): void
// Utilities
listenerCount(event: string): number
listeners(event: string): EventHandler<any>[]
}
EventHandler
Event handler functions:
type EventHandler<T = any> = (data: T) => void | Promise<void>
Unsubscribe
Function to remove event subscriptions:
type Unsubscribe = () => void
Content Change Observation
ContentWatcher Interface
The primary interface for watching content changes:
interface ContentWatcher {
watch(
pattern: string,
callback: WatchCallback,
options?: WatchOptions
): Unsubscribe
}
type WatchCallback = (
uri: string,
content: Content | null,
type: 'create' | 'update' | 'delete'
) => void
interface WatchOptions {
recursive?: boolean
ignoreInitial?: boolean
events?: ('create' | 'update' | 'delete')[]
debounce?: number
}
Basic Usage Pattern
Watching for content changes:
// Watch all Markdown files
const unsubscribe = store.watch('**/*.md', (uri, content, changeType) => {
console.log(`Content ${changeType}:`, uri)
if (content) {
// Update application state with new content
updateState(uri, content)
} else {
// Handle deletion
removeFromState(uri)
}
})
// Later, stop watching
unsubscribe()
Filtered Watching
Watch specific change types:
// Watch only for content updates
const unsubscribe = store.watch(
'articles/*.md',
(uri, content, changeType) => {
console.log(`Article updated: ${uri}`)
refreshArticleView(uri, content)
},
{
events: ['update'],
}
)
Pattern-Based Watching
Watch content based on patterns:
// Watch multiple patterns
const unsubscribes = [
// Watch Markdown files
store.watch('**/*.md', handleMarkdownChange),
// Watch JSON files
store.watch('**/*.json', handleJsonChange),
// Watch specific directory
store.watch('articles/**/*', handleArticleChange),
]
// Unsubscribe all
function unsubscribeAll() {
unsubscribes.forEach(unsubscribe => unsubscribe())
}
Event System Implementation
Simple EventEmitter Implementation
The foundational event emitter implementation:
function createEventEmitter(): EventEmitter {
const events: Record<string, EventHandler<any>[]> = {}
return {
on<T>(event: string, handler: EventHandler<T>): Unsubscribe {
events[event] = events[event] || []
events[event].push(handler)
return () => this.off(event, handler)
},
once<T>(event: string, handler: EventHandler<T>): Unsubscribe {
const onceHandler: EventHandler<T> = (data: T) => {
handler(data)
this.off(event, onceHandler)
}
return this.on(event, onceHandler)
},
emit<T>(event: string, data?: T): boolean {
const handlers = events[event]
if (!handlers || handlers.length === 0) {
return false
}
handlers.forEach(handler => {
try {
handler(data)
} catch (error) {
console.error(`Error in event handler for ${event}:`, error)
}
})
return true
},
off<T>(event: string, handler?: EventHandler<T>): void {
if (!events[event]) {
return
}
if (!handler) {
delete events[event]
return
}
events[event] = events[event].filter(h => h !== handler)
},
removeAllListeners(event?: string): void {
if (event) {
delete events[event]
} else {
Object.keys(events).forEach(key => delete events[key])
}
},
listenerCount(event: string): number {
return events[event]?.length || 0
},
listeners(event: string): EventHandler<any>[] {
return [...(events[event] || [])]
},
}
}
Adapter Event Implementation
Adapter-level event implementation:
class FileSystemAdapter implements ContentAdapter {
private events = createEventEmitter()
async write(uri: string, content: Content): Promise<void> {
// Record previous state
const existed = await this.exists(uri)
// Write content to filesystem
await writeToFs(uri, content)
// Emit appropriate event
this.events.emit(existed ? 'update' : 'create', {
uri,
content,
timestamp: Date.now(),
})
}
async delete(uri: string): Promise<void> {
// Delete from filesystem
await deleteFromFs(uri)
// Emit delete event
this.events.emit('delete', {
uri,
timestamp: Date.now(),
})
}
// ... other adapter methods
}
Store Event Implementation
Content store event implementation:
class ContentStore implements ContentStore {
private events = createEventEmitter()
private adapter: ContentAdapter
constructor(options: ContentStoreOptions) {
this.adapter = options.adapter
// Forward adapter events to store events
if (this.adapter.events) {
this.adapter.events.on('create', this.handleAdapterCreate)
this.adapter.events.on('update', this.handleAdapterUpdate)
this.adapter.events.on('delete', this.handleAdapterDelete)
}
}
watch(
pattern: string,
callback: WatchCallback,
options?: WatchOptions
): Unsubscribe {
// Create matcher function
const matcher = createMatcher(pattern, options)
// Subscribe to all events
const unsubscribeCreate = this.events.on('create', event => {
if (matcher(event.uri, 'create')) {
callback(event.uri, event.content, 'create')
}
})
const unsubscribeUpdate = this.events.on('update', event => {
if (matcher(event.uri, 'update')) {
callback(event.uri, event.content, 'update')
}
})
const unsubscribeDelete = this.events.on('delete', event => {
if (matcher(event.uri, 'delete')) {
callback(event.uri, null, 'delete')
}
})
// Return combined unsubscribe function
return () => {
unsubscribeCreate()
unsubscribeUpdate()
unsubscribeDelete()
}
}
// ... other store methods
}
Advanced Event Patterns
Debounced Events
Debounce rapid event sequences:
function debounce<T>(callback: EventHandler<T>, wait: number): EventHandler<T> {
let timeout: NodeJS.Timeout
return (data: T) => {
clearTimeout(timeout)
timeout = setTimeout(() => callback(data), wait)
}
}
// Usage
const debouncedHandler = debounce(data => {
console.log('Handling debounced event:', data)
}, 300)
store.events.on('update', debouncedHandler)
Event Batching
Batch related events:
function batchEvents<T>(
callback: (events: T[]) => void,
options: { maxTime?: number; maxSize?: number }
): [EventHandler<T>, () => void] {
const { maxTime = 300, maxSize = 10 } = options
let batch: T[] = []
let timeout: NodeJS.Timeout | null = null
const processBatch = () => {
if (batch.length === 0) return
callback([...batch])
batch = []
timeout = null
}
const handler: EventHandler<T> = (data: T) => {
batch.push(data)
if (batch.length >= maxSize) {
if (timeout) clearTimeout(timeout)
processBatch()
} else if (!timeout) {
timeout = setTimeout(processBatch, maxTime)
}
}
const flush = () => {
if (timeout) clearTimeout(timeout)
processBatch()
}
return [handler, flush]
}
// Usage
const [batchHandler, flushBatch] = batchEvents<ContentChangeEvent>(
events => {
console.log(`Processing ${events.length} changes in batch`)
updateUI(events)
},
{ maxTime: 500, maxSize: 20 }
)
store.events.on('update', batchHandler)
// Later, flush any pending events
flushBatch()
Event Filtering
Filter events based on criteria:
function filterEvents<T>(
predicate: (data: T) => boolean,
handler: EventHandler<T>
): EventHandler<T> {
return (data: T) => {
if (predicate(data)) {
handler(data)
}
}
}
// Usage
const handleImportantUpdates = filterEvents<ContentChangeEvent>(
event => event.content?.metadata?.priority === 'high',
event => {
console.log('High priority content updated:', event.uri)
notifyUser(event)
}
)
store.events.on('update', handleImportantUpdates)
Event Transformation
Transform events before handling:
function mapEvents<T, R>(
mapper: (data: T) => R,
handler: EventHandler<R>
): EventHandler<T> {
return (data: T) => {
handler(mapper(data))
}
}
// Usage
const enrichedEventHandler = mapEvents<ContentChangeEvent, EnrichedEvent>(
event => ({
...event,
title: event.content?.metadata?.title || 'Untitled',
timestamp: new Date(),
isMarkdown: event.uri.endsWith('.md'),
}),
enrichedEvent => {
console.log(
`${enrichedEvent.title} was ${event.type} at ${enrichedEvent.timestamp}`
)
updateUI(enrichedEvent)
}
)
store.events.on('update', enrichedEventHandler)
System Event Patterns
Application Lifecycle Events
Events for application lifecycle:
// Application initialization
store.events.emit('system:init', { timestamp: Date.now() })
// Application ready
store.events.emit('system:ready', {
timestamp: Date.now(),
initDuration: performance.now() - startTime,
})
// Application shutdown
store.events.on('system:shutdown', async () => {
// Clean up resources
await store.dispose()
console.log('Content store resources released')
})
Error Events
Events for error handling:
// Global error handler
store.events.on('system:error', error => {
console.error('System error:', error)
notifyUser('An error occurred', error.message)
logErrorToService(error)
})
// Operation error events
store.events.on('operation:error', ({ operation, uri, error }) => {
console.error(`Error during ${operation} operation on ${uri}:`, error)
if (error instanceof ContentNotFoundError) {
showNotFoundMessage(uri)
} else if (error instanceof ContentValidationError) {
showValidationErrors(uri, error.validationErrors)
}
})
Resource Events
Events for resource management:
// Cache events
store.events.on('cache:hit', ({ uri }) => {
console.debug(`Cache hit: ${uri}`)
incrementCacheHitCounter()
})
store.events.on('cache:miss', ({ uri }) => {
console.debug(`Cache miss: ${uri}`)
incrementCacheMissCounter()
})
// Adapter events
store.events.on('adapter:connected', ({ adapter }) => {
console.log(`Adapter connected: ${adapter.id}`)
updateConnectionStatus(adapter.id, 'connected')
})
store.events.on('adapter:disconnected', ({ adapter }) => {
console.log(`Adapter disconnected: ${adapter.id}`)
updateConnectionStatus(adapter.id, 'disconnected')
})
Event Performance Considerations
Memory Management
Properly manage event subscriptions:
class Component {
private unsubscribes: Unsubscribe[] = []
initialize(store: ContentStore) {
// Add subscriptions to tracking array
this.unsubscribes.push(
store.watch('articles/*.md', this.handleArticleChange),
store.events.on('system:shutdown', this.handleShutdown),
store.events.on('cache:cleared', this.handleCacheCleared)
)
}
dispose() {
// Clean up all subscriptions
this.unsubscribes.forEach(unsubscribe => unsubscribe())
this.unsubscribes = []
}
// Other component methods
}
Event Volume Control
Control the volume of events:
function throttleEvents<T>(
handler: EventHandler<T>,
interval: number
): EventHandler<T> {
let lastCall = 0
let timeout: NodeJS.Timeout | null = null
let pendingData: T | null = null
return (data: T) => {
const now = Date.now()
const timeSinceLastCall = now - lastCall
// Call immediately if enough time has passed
if (timeSinceLastCall >= interval) {
lastCall = now
handler(data)
return
}
// Otherwise, schedule for later and store most recent data
pendingData = data
if (!timeout) {
timeout = setTimeout(() => {
if (pendingData !== null) {
lastCall = Date.now()
handler(pendingData)
pendingData = null
timeout = null
}
}, interval - timeSinceLastCall)
}
}
}
// Usage
const throttledHandler = throttleEvents<ContentChangeEvent>(
event => {
console.log('Handling throttled event:', event)
updateUI(event)
},
500 // Once per 500ms maximum
)
store.events.on('update', throttledHandler)
Selective Event Subscription
Subscribe only to necessary events:
// BAD: Subscribing to all events
store.events.on('update', handleAllUpdates)
// GOOD: Subscribing only to relevant events
store.watch('articles/*.md', handleArticleUpdates, {
events: ['update'],
recursive: true,
})
Event Testing
Event Mocking
Mock events for testing:
// Mock event emitter
function createMockEventEmitter(): EventEmitter {
const events: Record<string, any[]> = {}
return {
// Standard EventEmitter interface implementation
// Testing utilities
getEvents(): Record<string, any[]> {
return { ...events }
},
clearEvents(): void {
Object.keys(events).forEach(key => delete events[key])
},
triggerEvent<T>(event: string, data?: T): void {
this.emit(event, data)
},
}
}
// Testing
test('handles content update events', () => {
const mockEmitter = createMockEventEmitter()
const mockStore = createMockStore({ events: mockEmitter })
const handler = jest.fn()
mockStore.events.on('update', handler)
// Trigger mock event
mockEmitter.triggerEvent('update', {
uri: 'test.md',
content: { data: 'test', contentType: 'text/plain', metadata: {} },
type: 'update',
})
expect(handler).toHaveBeenCalledTimes(1)
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
uri: 'test.md',
})
)
})
Event Verification
Verify events are emitted correctly:
test('store emits events on write', async () => {
const mockEmitter = createMockEventEmitter()
const store = createContentStore({
adapter: createMemoryAdapter(),
events: mockEmitter,
})
const spy = jest.spyOn(mockEmitter, 'emit')
await store.write('test.md', {
data: 'test content',
contentType: 'text/plain',
metadata: {},
})
expect(spy).toHaveBeenCalledWith(
'create',
expect.objectContaining({
uri: 'test.md',
})
)
})