webapp/src/modules/base/nostr/relay-hub.ts
padreug 078c55b8e9 Fix critical bug: prevent optimistic UI updates when event publish fails
**Problem:**
Task status changes (claim/start/complete/unclaim/delete) would update the
local UI state even when the Nostr event failed to publish to ANY relays.
This caused users to see "completed" tasks that were never actually published,
leading to confusion when the UI reverted after page refresh.

**Root Cause:**
ScheduledEventService optimistically updated local state after calling
publishEvent(), without checking if any relays accepted the event. If all
relay publishes failed (result.success = 0), the UI still updated.

**Solution:**
Modified RelayHub.publishEvent() to throw an error when no relays accept the
event (success = 0). This ensures:
- Existing try-catch blocks handle the error properly
- Error toast shown to user: "Failed to publish event - none of X relay(s) accepted it"
- Local state NOT updated (UI remains accurate)
- Consistent behavior across all services using publishEvent()

**Changes:**
- relay-hub.ts: Add check after publish - throw error if successful === 0
- ScheduledEventService.ts: Update comments to reflect new behavior

**Benefits:**
- Single source of truth for publish failure handling
- No code duplication (no need to check result.success everywhere)
- Better UX: Users immediately see error instead of false success
- UI state always matches server state after operations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-18 20:35:59 +01:00

707 lines
20 KiB
TypeScript

import { SimplePool, type Filter, type Event, type Relay } from 'nostr-tools'
import { BaseService } from '@/core/base/BaseService'
import { ref } from 'vue'
// Simple EventEmitter implementation for browser compatibility
class EventEmitter {
private events: { [key: string]: Function[] } = {}
on(event: string, listener: Function): void {
if (!this.events[event]) {
this.events[event] = []
}
this.events[event].push(listener)
}
off(event: string, listener: Function): void {
if (!this.events[event]) return
const index = this.events[event].indexOf(listener)
if (index > -1) {
this.events[event].splice(index, 1)
}
}
emit(event: string, ...args: any[]): void {
if (!this.events[event]) return
this.events[event].forEach(listener => listener(...args))
}
removeAllListeners(event?: string): void {
if (event) {
delete this.events[event]
} else {
this.events = {}
}
}
}
export interface RelayConfig {
url: string
read: boolean
write: boolean
priority?: number // Lower number = higher priority
}
export interface SubscriptionConfig {
id: string
filters: Filter[]
relays?: string[] // If not specified, uses all connected relays
onEvent?: (event: Event) => void
onEose?: () => void
onClose?: () => void
}
export interface RelayStatus {
url: string
connected: boolean
lastSeen: number
error?: string
latency?: number
}
export class RelayHub extends BaseService {
// Service metadata
protected readonly metadata = {
name: 'RelayHub',
version: '1.0.0',
dependencies: ['VisibilityService'] // Depends on visibility service for connection management
}
// EventEmitter functionality
private eventEmitter = new EventEmitter()
// RelayHub specific properties
private pool: SimplePool
private relayConfigs: Map<string, RelayConfig> = new Map()
private connectedRelays: Map<string, Relay> = new Map()
private subscriptions: Map<string, SubscriptionConfig> = new Map()
private activeSubscriptions: Map<string, any> = new Map() // Actual subscription objects
private reconnectInterval?: number
private healthCheckInterval?: number
private visibilityUnsubscribe?: () => void
// Connection state - we need both a reactive ref for components and internal state for business logic
public isConnected = ref(false)
private _isConnected = false
private _connectionAttempts = 0
private readonly maxReconnectAttempts = 5
private readonly reconnectDelay = 5000 // 5 seconds
private readonly healthCheckIntervalMs = 30000 // 30 seconds
constructor() {
super()
this.pool = new SimplePool()
}
// Delegate to internal EventEmitter
on(event: string, listener: Function): void {
this.eventEmitter.on(event, listener)
}
off(event: string, listener: Function): void {
this.eventEmitter.off(event, listener)
}
emit(event: string, ...args: any[]): void {
this.eventEmitter.emit(event, ...args)
}
removeAllListeners(event?: string): void {
this.eventEmitter.removeAllListeners(event)
}
get connectedRelayCount(): number {
// Return the actual size of connectedRelays map
return this.connectedRelays.size
}
get totalRelayCount(): number {
return this.relayConfigs.size
}
get totalSubscriptionCount(): number {
return this.subscriptions.size
}
get subscriptionDetails(): Array<{ id: string; filters: any[]; relays?: string[] }> {
return Array.from(this.subscriptions.entries()).map(([id, subscription]) => {
// Try to extract subscription details if available
return {
id,
filters: subscription.filters || [],
relays: subscription.relays || []
}
})
}
get relayStatuses(): RelayStatus[] {
return Array.from(this.relayConfigs.values()).map(config => {
const relay = this.connectedRelays.get(config.url)
return {
url: config.url,
connected: !!relay,
lastSeen: relay ? Date.now() : 0,
error: relay ? undefined : 'Not connected',
latency: relay ? 0 : undefined // TODO: Implement actual latency measurement
}
})
}
private relayUrls: string[] = []
/**
* Set relay URLs before initialization
*/
setRelayUrls(urls: string[]): void {
this.relayUrls = urls
}
/**
* Initialize the relay hub
*/
async initialize(options = {}): Promise<void> {
// Use BaseService's initialize method
await super.initialize({
waitForDependencies: false, // RelayHub has no dependencies
maxRetries: 1,
...options
})
}
/**
* Service-specific initialization (called by BaseService)
*/
protected async onInitialize(): Promise<void> {
if (!this.relayUrls || this.relayUrls.length === 0) {
throw new Error('No relay URLs provided. Call setRelayUrls() before initialize()')
}
const relayUrls = this.relayUrls
this.debug(`Initializing with URLs: ${relayUrls.join(', ')}`)
// Convert URLs to relay configs
this.relayConfigs.clear()
relayUrls.forEach((url, index) => {
this.relayConfigs.set(url, {
url,
read: true,
write: true,
priority: index
})
})
this.debug(`Relay configs created: ${this.relayConfigs.size} configs`)
// Start connection management
this.debug('Starting connection...')
await this.connect()
this.startHealthCheck()
// Register with visibility service for connection management
this.registerWithVisibilityService()
this.debug('Initialization complete')
}
/**
* Connect to all configured relays
*/
async connect(): Promise<void> {
if (this.relayConfigs.size === 0) {
throw new Error('No relay configurations found. Call initialize() first.')
}
console.log('🔧 RelayHub: Connecting to', this.relayConfigs.size, 'relays')
try {
this._connectionAttempts++
console.log('🔧 RelayHub: Connection attempt', this._connectionAttempts)
// Connect to relays in priority order
const sortedRelays = Array.from(this.relayConfigs.values())
.sort((a, b) => (a.priority || 0) - (b.priority || 0))
console.log('🔧 RelayHub: Attempting connections to:', sortedRelays.map(r => r.url))
const connectionPromises = sortedRelays.map(async (config) => {
try {
console.log('🔧 RelayHub: Connecting to relay:', config.url)
const relay = await this.pool.ensureRelay(config.url)
this.connectedRelays.set(config.url, relay)
console.log('🔧 RelayHub: Successfully connected to:', config.url)
return { url: config.url, success: true }
} catch (error) {
console.error(`🔧 RelayHub: Failed to connect to relay ${config.url}:`, error)
return { url: config.url, success: false, error }
}
})
const results = await Promise.allSettled(connectionPromises)
const successfulConnections = results.filter(
result => result.status === 'fulfilled' && result.value.success
)
console.log('🔧 RelayHub: Connection results:', {
total: results.length,
successful: successfulConnections.length,
failed: results.length - successfulConnections.length
})
if (successfulConnections.length > 0) {
this._isConnected = true
this.isConnected.value = true
this._connectionAttempts = 0
console.log('🔧 RelayHub: Connection successful, connected to', successfulConnections.length, 'relays')
this.emit('connected', successfulConnections.length)
} else {
console.error('🔧 RelayHub: Failed to connect to any relay')
throw new Error('Failed to connect to any relay')
}
} catch (error) {
this._isConnected = false
this.isConnected.value = false
console.error('🔧 RelayHub: Connection failed with error:', error)
this.emit('connectionError', error)
// Schedule reconnection if we haven't exceeded max attempts
if (this._connectionAttempts < this.maxReconnectAttempts) {
console.log('🔧 RelayHub: Scheduling reconnection attempt', this._connectionAttempts + 1)
this.scheduleReconnect()
} else {
this.emit('maxReconnectionAttemptsReached')
console.error('🔧 RelayHub: Max reconnection attempts reached')
}
}
}
/**
* Disconnect from all relays
*/
disconnect(): void {
// Clear intervals
if (this.reconnectInterval) {
clearTimeout(this.reconnectInterval)
this.reconnectInterval = undefined
}
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval)
this.healthCheckInterval = undefined
}
// Close all active subscriptions
for (const [, sub] of this.activeSubscriptions) {
if (sub && typeof sub.close === 'function') {
sub.close()
}
}
this.activeSubscriptions.clear()
this.subscriptions.clear()
// Close all relay connections
this.pool.close(Array.from(this.relayConfigs.keys()))
this.connectedRelays.clear()
this._isConnected = false
this.isConnected.value = false
this.emit('disconnected')
}
/**
* Register with visibility service for connection management
*/
private registerWithVisibilityService(): void {
if (!this.visibilityService) {
this.debug('VisibilityService not available')
return
}
this.visibilityUnsubscribe = this.visibilityService.registerService(
'RelayHub',
async () => this.handleResume(),
async () => this.handlePause()
)
this.debug('Registered with VisibilityService')
}
/**
* Handle app resume (visibility restored)
*/
private async handleResume(): Promise<void> {
this.debug('Handling resume from visibility change')
// Check connection health
const disconnectedRelays = this.checkDisconnectedRelays()
if (disconnectedRelays.length > 0) {
this.debug(`Found ${disconnectedRelays.length} disconnected relays, reconnecting...`)
await this.reconnectToRelays(disconnectedRelays)
}
// Restore all subscriptions
await this.restoreSubscriptions()
// Resume health check
this.startHealthCheck()
}
/**
* Handle app pause (visibility lost)
*/
private async handlePause(): Promise<void> {
this.debug('Handling pause from visibility change')
// Stop health check while paused
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval)
this.healthCheckInterval = undefined
}
// Don't disconnect immediately - just pause health checks
// Connections will be verified when we resume
}
/**
* Check which relays have disconnected
*/
private checkDisconnectedRelays(): string[] {
const disconnected: string[] = []
for (const [url] of this.relayConfigs) {
if (!this.connectedRelays.has(url)) {
disconnected.push(url)
}
}
return disconnected
}
/**
* Reconnect to specific relays
*/
private async reconnectToRelays(relayUrls: string[]): Promise<void> {
const promises = relayUrls.map(async url => {
try {
const relay = await this.pool.ensureRelay(url)
this.connectedRelays.set(url, relay)
this.debug(`Reconnected to relay: ${url}`)
return { url, success: true }
} catch (error) {
this.debug(`Failed to reconnect to relay ${url}:`, error)
return { url, success: false }
}
})
const results = await Promise.allSettled(promises)
const successful = results.filter(r => r.status === 'fulfilled' && r.value.success).length
if (successful > 0) {
this._isConnected = true
this.isConnected.value = true
this.emit('reconnected', successful)
}
}
/**
* Restore all subscriptions after reconnection
*/
private async restoreSubscriptions(): Promise<void> {
if (this.subscriptions.size === 0) {
this.debug('No subscriptions to restore')
return
}
this.debug(`Restoring ${this.subscriptions.size} subscriptions`)
// Close old subscription objects
for (const [, sub] of this.activeSubscriptions) {
if (sub && typeof sub.close === 'function') {
sub.close()
}
}
this.activeSubscriptions.clear()
// Recreate subscriptions
for (const [id, config] of this.subscriptions) {
try {
const targetRelays = config.relays || Array.from(this.connectedRelays.keys())
const availableRelays = targetRelays.filter(url => this.connectedRelays.has(url))
if (availableRelays.length === 0) {
this.debug(`No available relays for subscription ${id}`)
continue
}
// Recreate the subscription
const subscription = this.pool.subscribeMany(availableRelays, config.filters, {
onevent: (event: Event) => {
config.onEvent?.(event)
this.emit('event', { subscriptionId: id, event, relay: 'unknown' })
},
oneose: () => {
config.onEose?.()
this.emit('eose', { subscriptionId: id })
}
})
this.activeSubscriptions.set(id, subscription)
this.debug(`Restored subscription: ${id}`)
} catch (error) {
this.debug(`Failed to restore subscription ${id}:`, error)
}
}
this.emit('subscriptionsRestored', this.subscriptions.size)
}
/**
* Subscribe to events from relays
*/
subscribe(config: SubscriptionConfig): () => void {
if (!this.isInitialized) {
throw new Error('RelayHub not initialized. Call initialize() first.')
}
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
// Determine which relays to use
const targetRelays = config.relays || Array.from(this.connectedRelays.keys())
const availableRelays = targetRelays.filter(url => this.connectedRelays.has(url))
if (availableRelays.length === 0) {
throw new Error('No available relays for subscription')
}
// Create subscription using the pool
const subscription = this.pool.subscribeMany(availableRelays, config.filters, {
onevent: (event: Event) => {
config.onEvent?.(event)
this.emit('event', { subscriptionId: config.id, event, relay: 'unknown' })
},
oneose: () => {
config.onEose?.()
this.emit('eose', { subscriptionId: config.id })
}
})
// Store both config and active subscription
this.subscriptions.set(config.id, config)
this.activeSubscriptions.set(config.id, subscription)
// Emit subscription created event
this.emit('subscriptionCreated', { id: config.id, count: this.subscriptions.size })
// Return unsubscribe function
return () => {
this.unsubscribe(config.id)
}
}
/**
* Unsubscribe from a specific subscription
*/
unsubscribe(subscriptionId: string): void {
// Close the active subscription
const activeSubscription = this.activeSubscriptions.get(subscriptionId)
if (activeSubscription && typeof activeSubscription.close === 'function') {
activeSubscription.close()
}
// Remove from both maps
this.subscriptions.delete(subscriptionId)
this.activeSubscriptions.delete(subscriptionId)
// Emit subscription removed event
this.emit('subscriptionRemoved', { id: subscriptionId, count: this.subscriptions.size })
}
/**
* Publish an event to all connected relays
*/
async publishEvent(event: Event): Promise<{ success: number; total: number }> {
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
const relayUrls = Array.from(this.connectedRelays.keys())
const results = await Promise.allSettled(
relayUrls.map(relay => this.pool.publish([relay], event))
)
const successful = results.filter(result => result.status === 'fulfilled').length
const total = results.length
this.emit('eventPublished', { eventId: event.id, success: successful, total })
// Throw error if no relays accepted the event
if (successful === 0) {
throw new Error(`Failed to publish event - none of the ${total} relay(s) accepted it`)
}
return { success: successful, total }
}
/**
* Query events from relays (one-time fetch)
*/
async queryEvents(filters: Filter[], relays?: string[]): Promise<Event[]> {
if (!this._isConnected) {
throw new Error('Not connected to any relays')
}
const targetRelays = relays || Array.from(this.connectedRelays.keys())
const availableRelays = targetRelays.filter(url => this.connectedRelays.has(url))
if (availableRelays.length === 0) {
throw new Error('No available relays for query')
}
try {
// Query each filter separately and combine results
const allEvents: Event[] = []
for (const filter of filters) {
const events = await this.pool.querySync(availableRelays, filter)
allEvents.push(...events)
}
return allEvents
} catch (error) {
console.error('Query failed:', error)
throw error
}
}
/**
* Get a specific relay instance
*/
getRelay(url: string): Relay | undefined {
return this.connectedRelays.get(url)
}
/**
* Check if a specific relay is connected
*/
isRelayConnected(url: string): boolean {
return this.connectedRelays.has(url)
}
/**
* Force reconnection to all relays
*/
async reconnect(): Promise<void> {
this.disconnect()
await this.connect()
}
/**
* Schedule automatic reconnection
*/
private scheduleReconnect(): void {
if (this.reconnectInterval) {
clearTimeout(this.reconnectInterval)
}
this.reconnectInterval = setTimeout(async () => {
await this.connect()
}, this.reconnectDelay) as unknown as number
}
/**
* Start health check monitoring
*/
private startHealthCheck(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval)
}
this.healthCheckInterval = setInterval(() => {
this.performHealthCheck()
}, this.healthCheckIntervalMs) as unknown as number
}
/**
* Perform health check on all relays
*/
private async performHealthCheck(): Promise<void> {
if (!this._isConnected) return
const disconnectedRelays: string[] = []
// Check each relay connection
for (const [url] of this.connectedRelays) {
try {
// Try to send a ping or check connection status
// For now, we'll just check if the relay is still in our connected relays map
if (!this.connectedRelays.has(url)) {
disconnectedRelays.push(url)
}
} catch (error) {
console.warn(`Health check failed for relay ${url}:`, error)
disconnectedRelays.push(url)
}
}
// Remove disconnected relays
disconnectedRelays.forEach(url => {
this.connectedRelays.delete(url)
})
// Update connection status
if (this.connectedRelays.size === 0) {
this._isConnected = false
this.isConnected.value = false
this.emit('allRelaysDisconnected')
console.warn('All relays disconnected, attempting reconnection...')
await this.connect()
} else if (this.connectedRelays.size < this.relayConfigs.size) {
this.emit('partialDisconnection', {
connected: this.connectedRelays.size,
total: this.relayConfigs.size
})
}
}
/**
* Cleanup resources
*/
destroy(): void {
this.dispose()
}
/**
* Cleanup when service is disposed (called by BaseService)
*/
protected async onDispose(): Promise<void> {
// Unregister from visibility service
if (this.visibilityUnsubscribe) {
this.visibilityUnsubscribe()
this.visibilityUnsubscribe = undefined
}
// Disconnect and cleanup
this.disconnect()
this.removeAllListeners()
this.debug('RelayHub disposed')
}
}
// Export singleton instance
export const relayHub = new RelayHub()
// Ensure global export
;(globalThis as any).relayHub = relayHub