diff --git a/lnbits/app.py b/lnbits/app.py index a00d7157..f68fe046 100644 --- a/lnbits/app.py +++ b/lnbits/app.py @@ -482,6 +482,12 @@ def register_async_tasks() -> None: create_permanent_task(purge_audit_data) create_permanent_task(collect_exchange_rates_data) + # nostr transport (RPC over nostr relays) + if settings.nostr_transport_enabled: + from lnbits.core.services.nostr_transport import start_nostr_transport + + create_permanent_task(start_nostr_transport) + # server logs for websocket if settings.lnbits_admin_ui: server_log_task = initialize_server_websocket_logger() diff --git a/lnbits/core/services/nostr_transport/__init__.py b/lnbits/core/services/nostr_transport/__init__.py new file mode 100644 index 00000000..6d071b2e --- /dev/null +++ b/lnbits/core/services/nostr_transport/__init__.py @@ -0,0 +1,135 @@ +""" +Nostr transport layer for LNbits. + +Enables LNbits API access over nostr relays using NIP-44 encrypted +kind-21000 events, modeled after Lightning.pub's architecture. +No port forwarding, DNS, or SSL required. + +Usage: + Set these in .env: + NOSTR_TRANSPORT_ENABLED=true + NOSTR_TRANSPORT_PRIVATE_KEY= + NOSTR_TRANSPORT_RELAYS=["wss://relay.damus.io"] +""" + +import json + +from loguru import logger + +from lnbits.settings import settings +from lnbits.utils.nostr import generate_keypair, normalize_private_key + +from .dispatcher import dispatch, register_default_rpcs +from .models import NostrRpcRequest, NostrRpcResponse +from .relay_pool import NostrTransportPool + +_pool: NostrTransportPool | None = None + + +async def start_nostr_transport(): + """ + Main entry point. Called from app.py via create_permanent_task(). + Initializes the relay pool, registers RPC endpoints, and starts + listening for incoming nostr events. + """ + global _pool + + if not settings.nostr_transport_enabled: + return + + # Resolve keypair + private_key_hex = settings.nostr_transport_private_key + if not private_key_hex: + logger.info("Nostr transport: no private key configured, generating one...") + private_key_hex, public_key_hex = generate_keypair() + settings.nostr_transport_private_key = private_key_hex + settings.nostr_transport_public_key = public_key_hex + logger.info( + f"Nostr transport: generated keypair. " + f"Public key (share this): {public_key_hex}" + ) + else: + private_key_hex = normalize_private_key(private_key_hex) + # Derive public key from private key + import coincurve + + privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex)) + public_key_hex = privkey.public_key_xonly.format().hex() + settings.nostr_transport_public_key = public_key_hex + + relay_urls = settings.nostr_transport_relays + if not relay_urls: + logger.warning("Nostr transport: no relays configured, disabling") + return + + logger.info( + f"Nostr transport: starting with pubkey {public_key_hex[:16]}... " + f"on {len(relay_urls)} relay(s)" + ) + + # Register RPC endpoints + register_default_rpcs() + + # Create and start relay pool + _pool = NostrTransportPool( + private_key_hex=private_key_hex, + public_key_hex=public_key_hex, + relay_urls=relay_urls, + event_callback=_handle_event, + ) + await _pool.start() + + # Keep alive until shutdown + while not _pool._is_shutting_down(): + import asyncio + + await asyncio.sleep(1) + + await stop_nostr_transport() + + +async def stop_nostr_transport(): + """Clean shutdown of the nostr transport.""" + global _pool + if _pool: + await _pool.stop() + _pool = None + logger.info("Nostr transport: stopped") + + +async def _handle_event(sender_pubkey: str, plaintext: str): + """ + Handle a decrypted incoming event. + Parse as RPC request, dispatch, and send response. + """ + try: + data = json.loads(plaintext) + except json.JSONDecodeError: + logger.warning("Nostr transport: received non-JSON event content") + return + + # Parse request + try: + request = NostrRpcRequest(**data) + except Exception as e: + logger.warning(f"Nostr transport: invalid RPC request: {e}") + # Try to send error response if we have a request_id + request_id = data.get("request_id", "unknown") + response = NostrRpcResponse( + status="ERROR", + request_id=request_id, + error=f"Invalid request: {e}", + ) + if _pool: + await _pool.send_response(sender_pubkey, response.json()) + return + + # Validate authIdentifier matches sender (Lightning.pub pattern) + # In our case, the sender pubkey from the event IS the auth identity + + # Dispatch + response = await dispatch(request, sender_pubkey) + + # Send response + if _pool: + await _pool.send_response(sender_pubkey, response.json()) diff --git a/lnbits/core/services/nostr_transport/auth.py b/lnbits/core/services/nostr_transport/auth.py new file mode 100644 index 00000000..14cea49c --- /dev/null +++ b/lnbits/core/services/nostr_transport/auth.py @@ -0,0 +1,78 @@ +""" +Nostr pubkey -> LNbits account/wallet authorization. + +Maps a nostr event sender's pubkey to the appropriate LNbits +authorization context, following the same patterns as the HTTP +decorators in lnbits/decorators.py. +""" + +from uuid import uuid4 + +from loguru import logger + +from lnbits.core.crud.users import get_account_by_pubkey +from lnbits.core.crud.wallets import get_wallet +from lnbits.core.models import Account, UserExtra +from lnbits.core.models.wallets import KeyType, WalletTypeInfo +from lnbits.core.services.users import create_user_account +from lnbits.settings import settings + + +async def resolve_nostr_auth( + pubkey: str, + wallet_id: str | None = None, + key_type_str: str | None = None, +) -> WalletTypeInfo | Account | None: + """ + Resolve a nostr pubkey + optional wallet_id + key_type to an + LNbits authorization context. + + Returns: + WalletTypeInfo - for wallet-scoped operations (payments, etc.) + Account - for account-level operations (create wallet, etc.) + None - for public endpoints (decode, payment status) + """ + if wallet_id is None and key_type_str is None: + # Public endpoint -- no auth needed, but still resolve account + # if pubkey is provided (for audit/logging) + account = await get_account_by_pubkey(pubkey) + if account: + return account + return None + + # Resolve account from pubkey + account = await get_account_by_pubkey(pubkey) + if not account: + if not settings.new_accounts_allowed: + raise PermissionError("Account creation is disabled.") + # Auto-create account (same as nostr_login in auth_api.py) + account = Account( + id=uuid4().hex, + pubkey=pubkey, + extra=UserExtra(provider="nostr"), + ) + user = await create_user_account(account) + logger.info(f"Nostr transport: auto-created account for {pubkey[:16]}...") + account = Account( + id=user.id, + pubkey=pubkey, + extra=UserExtra(provider="nostr"), + ) + + if not account.activated: + raise PermissionError("Account is not activated.") + + # Account-level operation (no wallet specified) + if wallet_id is None: + return account + + # Wallet-scoped operation + wallet = await get_wallet(wallet_id) + if not wallet: + raise ValueError(f"Wallet not found: {wallet_id}") + + if wallet.user != account.id: + raise PermissionError("Wallet does not belong to this account.") + + key_type = KeyType.admin if key_type_str == "admin" else KeyType.invoice + return WalletTypeInfo(key_type=key_type, wallet=wallet) diff --git a/lnbits/core/services/nostr_transport/crypto.py b/lnbits/core/services/nostr_transport/crypto.py new file mode 100644 index 00000000..57c7a32e --- /dev/null +++ b/lnbits/core/services/nostr_transport/crypto.py @@ -0,0 +1,187 @@ +""" +NIP-44 v2 encryption/decryption for nostr transport. + +Implements the NIP-44 v2 spec using coincurve (secp256k1 ECDH) and +pycryptodome (ChaCha20, HMAC-SHA256, HKDF), all already in LNbits' deps. + +Also re-exports sign_event() and verify_event() from lnbits.utils.nostr. +""" + +import base64 +import hashlib +import hmac +import os +import struct +from math import floor, log2 + +import coincurve +from Cryptodome.Cipher import ChaCha20 + +from lnbits.utils.nostr import sign_event, verify_event # noqa: F401 + +_NIP44_SALT = b"nip44-v2" +_VERSION = 2 + + +def get_conversation_key( + private_key_hex: str, public_key_hex: str +) -> bytes: + """ + Compute NIP-44 v2 conversation key via ECDH + HKDF-extract. + + The public key must be 32-byte x-only (as in nostr). We prepend 0x02 + to make it a valid compressed SEC1 point for coincurve. + """ + privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex)) + # x-only pubkey -> compressed pubkey (assume even y) + pubkey_bytes = b"\x02" + bytes.fromhex(public_key_hex) + pubkey = coincurve.PublicKey(pubkey_bytes) + + # ECDH: multiply pubkey by privkey, get raw x coordinate (32 bytes) + # coincurve's ecdh returns sha256(compressed_point) by default, + # but NIP-44 needs the unhashed x coordinate. + shared_point = pubkey.multiply(privkey.secret) + shared_x = shared_point.format(compressed=True)[1:] # strip 0x02/0x03 prefix + + # HKDF-extract with salt="nip44-v2" + return _hkdf_extract(salt=_NIP44_SALT, ikm=shared_x) + + +def nip44_encrypt( + plaintext: str, sender_privkey_hex: str, recipient_pubkey_hex: str +) -> str: + """Encrypt a plaintext string using NIP-44 v2.""" + conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex) + nonce = os.urandom(32) + return _encrypt(plaintext, conversation_key, nonce) + + +def nip44_decrypt( + payload: str, recipient_privkey_hex: str, sender_pubkey_hex: str +) -> str: + """Decrypt a NIP-44 v2 payload.""" + conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex) + return _decrypt(payload, conversation_key) + + +# --- Internal implementation --- + + +def _hkdf_extract(salt: bytes, ikm: bytes) -> bytes: + """HKDF-extract (RFC 5869) with SHA-256.""" + return hmac.new(salt, ikm, hashlib.sha256).digest() + + +def _hkdf_expand(prk: bytes, info: bytes, length: int) -> bytes: + """HKDF-expand (RFC 5869) with SHA-256.""" + hash_len = 32 # SHA-256 + n = (length + hash_len - 1) // hash_len + okm = b"" + t = b"" + for i in range(1, n + 1): + t = hmac.new(prk, t + info + bytes([i]), hashlib.sha256).digest() + okm += t + return okm[:length] + + +def _calc_padded_len(unpadded_len: int) -> int: + """NIP-44 v2 padding length calculation.""" + if unpadded_len <= 32: + return 32 + next_power = 1 << (floor(log2(unpadded_len - 1)) + 1) + if next_power <= 256: + chunk = 32 + else: + chunk = next_power // 8 + return chunk * (floor((unpadded_len - 1) / chunk) + 1) + + +def _pad(plaintext: str) -> bytes: + """Convert plaintext to NIP-44 v2 padded byte array.""" + unpadded = plaintext.encode("utf-8") + unpadded_len = len(unpadded) + if unpadded_len < 1 or unpadded_len > 65535: + raise ValueError(f"Invalid plaintext length: {unpadded_len}") + padded_len = _calc_padded_len(unpadded_len) + prefix = struct.pack(">H", unpadded_len) # big-endian uint16 + suffix = b"\x00" * (padded_len - unpadded_len) + return prefix + unpadded + suffix + + +def _unpad(padded: bytes) -> str: + """Convert NIP-44 v2 padded byte array back to plaintext.""" + unpadded_len = struct.unpack(">H", padded[:2])[0] + if unpadded_len == 0: + raise ValueError("Invalid padding: zero length") + unpadded = padded[2 : 2 + unpadded_len] + if len(unpadded) != unpadded_len: + raise ValueError("Invalid padding: length mismatch") + expected_padded_total = 2 + _calc_padded_len(unpadded_len) + if len(padded) != expected_padded_total: + raise ValueError("Invalid padding: wrong padded size") + return unpadded.decode("utf-8") + + +def _get_message_keys(conversation_key: bytes, nonce: bytes) -> tuple[bytes, bytes, bytes]: + """Derive chacha_key, chacha_nonce, hmac_key from conversation_key and nonce.""" + keys = _hkdf_expand(conversation_key, nonce, 76) + chacha_key = keys[0:32] + chacha_nonce = keys[32:44] + hmac_key = keys[44:76] + return chacha_key, chacha_nonce, hmac_key + + +def _encrypt(plaintext: str, conversation_key: bytes, nonce: bytes) -> str: + """Encrypt using NIP-44 v2.""" + chacha_key, chacha_nonce, hmac_key = _get_message_keys(conversation_key, nonce) + padded = _pad(plaintext) + + # ChaCha20 encrypt (RFC 8439, counter=0) + cipher = ChaCha20.new(key=chacha_key, nonce=chacha_nonce) + ciphertext = cipher.encrypt(padded) + + # HMAC-SHA256 over nonce + ciphertext + mac = hmac.new(hmac_key, nonce + ciphertext, hashlib.sha256).digest() + + # Assemble: version(1) + nonce(32) + ciphertext(variable) + mac(32) + payload = bytes([_VERSION]) + nonce + ciphertext + mac + return base64.b64encode(payload).decode("ascii") + + +def _decrypt(payload_b64: str, conversation_key: bytes) -> str: + """Decrypt a NIP-44 v2 payload.""" + if not payload_b64: + raise ValueError("Empty payload") + if payload_b64[0] == "#": + raise ValueError("Unknown encoding version") + + plen = len(payload_b64) + if plen < 132 or plen > 87472: + raise ValueError(f"Invalid payload size: {plen}") + + data = base64.b64decode(payload_b64) + dlen = len(data) + if dlen < 99 or dlen > 65603: + raise ValueError(f"Invalid data size: {dlen}") + + version = data[0] + if version != _VERSION: + raise ValueError(f"Unknown version: {version}") + + nonce = data[1:33] + ciphertext = data[33 : dlen - 32] + mac = data[dlen - 32 : dlen] + + # Derive keys + chacha_key, chacha_nonce, hmac_key = _get_message_keys(conversation_key, nonce) + + # Verify MAC (constant-time) + expected_mac = hmac.new(hmac_key, nonce + ciphertext, hashlib.sha256).digest() + if not hmac.compare_digest(mac, expected_mac): + raise ValueError("Invalid MAC") + + # Decrypt + cipher = ChaCha20.new(key=chacha_key, nonce=chacha_nonce) + padded = cipher.decrypt(ciphertext) + + return _unpad(padded) diff --git a/lnbits/core/services/nostr_transport/dispatcher.py b/lnbits/core/services/nostr_transport/dispatcher.py new file mode 100644 index 00000000..05502368 --- /dev/null +++ b/lnbits/core/services/nostr_transport/dispatcher.py @@ -0,0 +1,249 @@ +""" +RPC dispatcher for the nostr transport. + +Maps rpc_name strings to LNbits service functions, handles auth +resolution, and returns serialized responses. Modeled after +Lightning.pub's nostr_transport.ts auto-generated dispatcher. +""" + +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Any + +from loguru import logger + +from lnbits.core.models import Account +from lnbits.core.models.wallets import KeyType, WalletTypeInfo + +from .auth import resolve_nostr_auth +from .models import NostrRpcRequest, NostrRpcResponse + +# Auth level constants +AUTH_NONE = "none" +AUTH_ACCOUNT = "account" +AUTH_INVOICE_KEY = "invoice_key" +AUTH_ADMIN_KEY = "admin_key" + + +@dataclass +class RpcEndpoint: + handler: Callable[..., Awaitable[Any]] + auth_level: str # AUTH_NONE, AUTH_ACCOUNT, AUTH_INVOICE_KEY, AUTH_ADMIN_KEY + + +# Global RPC registry +_RPC_REGISTRY: dict[str, RpcEndpoint] = {} + + +def register_rpc( + name: str, + handler: Callable[..., Awaitable[Any]], + auth_level: str = AUTH_INVOICE_KEY, +): + """Register an RPC handler.""" + _RPC_REGISTRY[name] = RpcEndpoint(handler=handler, auth_level=auth_level) + + +async def dispatch(request: NostrRpcRequest, sender_pubkey: str) -> NostrRpcResponse: + """ + Route an RPC request to the appropriate handler. + + 1. Look up rpc_name in registry + 2. Resolve auth from sender pubkey + 3. Validate auth level + 4. Call handler + 5. Return response + """ + endpoint = _RPC_REGISTRY.get(request.rpc_name) + if not endpoint: + return NostrRpcResponse( + status="ERROR", + request_id=request.request_id, + error=f"Unknown RPC: {request.rpc_name}", + ) + + try: + # Resolve auth + auth_ctx = await _resolve_auth( + endpoint.auth_level, sender_pubkey, request.wallet_id, request.key_type + ) + + # Call handler + result = await endpoint.handler(auth_ctx, request) + return NostrRpcResponse( + status="OK", + request_id=request.request_id, + data=result, + ) + except (PermissionError, ValueError) as e: + return NostrRpcResponse( + status="ERROR", + request_id=request.request_id, + error=str(e), + ) + except Exception as e: + logger.error(f"Nostr transport: RPC error in {request.rpc_name}: {e}") + return NostrRpcResponse( + status="ERROR", + request_id=request.request_id, + error="Internal error", + ) + + +async def _resolve_auth( + auth_level: str, + sender_pubkey: str, + wallet_id: str | None, + key_type_str: str | None, +) -> WalletTypeInfo | Account | None: + """Resolve and validate auth for the given level.""" + if auth_level == AUTH_NONE: + return None + + auth_ctx = await resolve_nostr_auth(sender_pubkey, wallet_id, key_type_str) + + if auth_level == AUTH_ACCOUNT: + if auth_ctx is None: + raise PermissionError("Authentication required.") + return auth_ctx + + if auth_level in (AUTH_INVOICE_KEY, AUTH_ADMIN_KEY): + if not isinstance(auth_ctx, WalletTypeInfo): + raise PermissionError("Wallet authentication required (provide wallet_id).") + if auth_level == AUTH_ADMIN_KEY and auth_ctx.key_type != KeyType.admin: + raise PermissionError("Admin key required for this operation.") + return auth_ctx + + raise PermissionError(f"Unknown auth level: {auth_level}") + + +# --- Default RPC handlers --- + + +def register_default_rpcs(): + """Register the core LNbits wallet RPCs.""" + register_rpc("create_invoice", _handle_create_invoice, AUTH_INVOICE_KEY) + register_rpc("pay_invoice", _handle_pay_invoice, AUTH_ADMIN_KEY) + register_rpc("get_payment", _handle_get_payment, AUTH_NONE) + register_rpc("list_payments", _handle_list_payments, AUTH_INVOICE_KEY) + register_rpc("get_wallet", _handle_get_wallet, AUTH_INVOICE_KEY) + register_rpc("create_wallet", _handle_create_wallet, AUTH_ACCOUNT) + register_rpc("decode_payment", _handle_decode_payment, AUTH_NONE) + + +async def _handle_create_invoice( + auth: WalletTypeInfo, request: NostrRpcRequest +) -> dict: + from lnbits.core.models import CreateInvoice + from lnbits.core.services.payments import create_payment_request + + body = request.body or {} + invoice_data = CreateInvoice( + out=False, + amount=body.get("amount", 0), + memo=body.get("memo"), + unit=body.get("unit", "sat"), + expiry=body.get("expiry"), + extra=body.get("extra"), + webhook=body.get("webhook"), + ) + payment = await create_payment_request(auth.wallet.id, invoice_data) + return payment.dict() + + +async def _handle_pay_invoice(auth: WalletTypeInfo, request: NostrRpcRequest) -> dict: + from lnbits.core.services.payments import pay_invoice + + body = request.body or {} + bolt11 = body.get("bolt11", "") + if not bolt11: + raise ValueError("bolt11 is required") + + payment = await pay_invoice( + wallet_id=auth.wallet.id, + payment_request=bolt11, + max_sat=body.get("max_sat"), + extra=body.get("extra"), + description=body.get("description", ""), + ) + return payment.dict() + + +async def _handle_get_payment( + auth: None, request: NostrRpcRequest +) -> dict | None: + from lnbits.core.crud.payments import get_standalone_payment + + body = request.body or {} + query = request.query or {} + payment_hash = body.get("payment_hash") or query.get("payment_hash", "") + if not payment_hash: + raise ValueError("payment_hash is required") + + payment = await get_standalone_payment(payment_hash) + if not payment: + return None + return payment.dict() + + +async def _handle_list_payments( + auth: WalletTypeInfo, request: NostrRpcRequest +) -> list[dict]: + from lnbits.core.crud.payments import get_payments + + query = request.query or {} + payments = await get_payments( + wallet_id=auth.wallet.id, + complete=query.get("complete", False), + pending=query.get("pending", False), + outgoing=query.get("outgoing", False), + incoming=query.get("incoming", False), + limit=query.get("limit"), + offset=query.get("offset"), + ) + return [p.dict() for p in payments] + + +async def _handle_get_wallet(auth: WalletTypeInfo, request: NostrRpcRequest) -> dict: + return { + "id": auth.wallet.id, + "name": auth.wallet.name, + "balance": auth.wallet.balance_msat, + } + + +async def _handle_create_wallet(auth: Account, request: NostrRpcRequest) -> dict: + from lnbits.core.crud.wallets import create_wallet + + body = request.body or {} + wallet = await create_wallet( + user_id=auth.id, + wallet_name=body.get("name", "Nostr Wallet"), + ) + return { + "id": wallet.id, + "name": wallet.name, + "adminkey": wallet.adminkey, + "inkey": wallet.inkey, + } + + +async def _handle_decode_payment(auth: None, request: NostrRpcRequest) -> dict: + from lnbits import bolt11 + + body = request.body or {} + payment_request = body.get("payment_request", "") + if not payment_request: + raise ValueError("payment_request is required") + + invoice = bolt11.decode(payment_request) + return { + "payment_hash": invoice.payment_hash, + "amount_msat": invoice.amount_msat, + "description": invoice.description, + "description_hash": invoice.description_hash, + "payee": invoice.payee, + "date": invoice.date, + "expiry": invoice.expiry, + "min_final_cltv_expiry": invoice.min_final_cltv_expiry, + } diff --git a/lnbits/core/services/nostr_transport/models.py b/lnbits/core/services/nostr_transport/models.py new file mode 100644 index 00000000..24137923 --- /dev/null +++ b/lnbits/core/services/nostr_transport/models.py @@ -0,0 +1,29 @@ +from typing import Any + +from pydantic import BaseModel + + +class NostrRpcRequest(BaseModel): + """ + Incoming RPC request over nostr transport. + Mirrors Lightning.pub's NostrRequest type from nostr_transport.ts. + """ + + rpc_name: str + wallet_id: str | None = None + key_type: str | None = None # "admin" or "invoice" + request_id: str + body: dict | None = None + query: dict | None = None + + +class NostrRpcResponse(BaseModel): + """ + Outgoing RPC response over nostr transport. + Mirrors Lightning.pub's response format: {status, requestId, ...data}. + """ + + status: str # "OK" or "ERROR" + request_id: str + data: Any | None = None + error: str | None = None diff --git a/lnbits/core/services/nostr_transport/relay_pool.py b/lnbits/core/services/nostr_transport/relay_pool.py new file mode 100644 index 00000000..871886fa --- /dev/null +++ b/lnbits/core/services/nostr_transport/relay_pool.py @@ -0,0 +1,241 @@ +""" +Nostr relay connection pool for the transport layer. + +Manages WebSocket connections to multiple nostr relays, subscribes for +incoming RPC events (kind 21000) tagged with the LNbits instance's +pubkey, validates and decrypts them, and publishes encrypted responses. + +Follows the reconnection pattern from lnbits/wallets/nwc.py and the +relay pool design from Lightning.pub's nostrPool.ts. +""" + +import asyncio +import json +import time +from collections.abc import Awaitable, Callable + +import coincurve +from loguru import logger +from websockets import connect as ws_connect + +from lnbits.settings import settings +from lnbits.utils.nostr import json_dumps, sign_event, verify_event + +from .crypto import nip44_decrypt, nip44_encrypt + +EVENT_KIND = 21000 +# Events older than this are deduplication-expired (seconds) +_HANDLED_EVENT_TTL = 7200 # 2 hours + + +class NostrTransportPool: + """ + Manages relay connections for the nostr transport layer. + + Subscribes for kind-21000 events tagged with our pubkey, + decrypts them, and forwards to a callback. Publishes encrypted + response events back to the sender. + """ + + def __init__( + self, + private_key_hex: str, + public_key_hex: str, + relay_urls: list[str], + event_callback: Callable[..., Awaitable[None]], + ): + self.private_key_hex = private_key_hex + self.public_key_hex = public_key_hex + self.relay_urls = relay_urls + self.event_callback = event_callback + + # coincurve key objects for signing + self._privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex)) + + # Deduplication: event_id -> timestamp + self._handled_events: dict[str, float] = {} + + # Active relay tasks and websockets + self._relay_tasks: list[asyncio.Task] = [] + self._relay_ws: dict[str, object] = {} # relay_url -> ws + + self._shutdown = False + + def _is_shutting_down(self) -> bool: + return self._shutdown or not settings.lnbits_running + + async def start(self): + """Connect to all configured relays.""" + logger.info( + f"Nostr transport: connecting to {len(self.relay_urls)} relay(s) " + f"as {self.public_key_hex[:16]}..." + ) + for relay_url in self.relay_urls: + task = asyncio.create_task(self._connect_to_relay(relay_url)) + self._relay_tasks.append(task) + + # Periodic cleanup of handled events cache + self._cleanup_task = asyncio.create_task(self._cleanup_handled_events()) + + async def stop(self): + """Shut down all relay connections.""" + self._shutdown = True + for task in self._relay_tasks: + task.cancel() + self._cleanup_task.cancel() + # Close websockets + for url, ws in self._relay_ws.items(): + try: + await ws.close() + except Exception: + pass + self._relay_ws.clear() + logger.info("Nostr transport: all relays disconnected") + + async def send_response(self, recipient_pubkey: str, content: str): + """Encrypt and publish a response event to the sender.""" + encrypted = nip44_encrypt(content, self.private_key_hex, recipient_pubkey) + + event = { + "kind": EVENT_KIND, + "created_at": int(time.time()), + "tags": [["p", recipient_pubkey]], + "content": encrypted, + } + event = sign_event(event, self.public_key_hex, self._privkey) + + event_json = json.dumps(["EVENT", event]) + # Publish to all connected relays + sent = False + for url, ws in list(self._relay_ws.items()): + try: + await ws.send(event_json) + sent = True + except Exception as e: + logger.warning(f"Nostr transport: failed to send to {url}: {e}") + if not sent: + logger.error("Nostr transport: could not send response to any relay") + + # --- Internal --- + + async def _connect_to_relay(self, relay_url: str): + """Connect to a single relay with auto-reconnect.""" + while not self._is_shutting_down(): + try: + async with ws_connect(relay_url) as ws: + self._relay_ws[relay_url] = ws + logger.info(f"Nostr transport: connected to {relay_url}") + + # Subscribe for events tagged with our pubkey + sub_id = f"lnbits_transport_{self.public_key_hex[:8]}" + sub_filter = { + "kinds": [EVENT_KIND], + "#p": [self.public_key_hex], + "since": int(time.time()), + } + await ws.send(json.dumps(["REQ", sub_id, sub_filter])) + + # Receive loop + while not self._is_shutting_down(): + try: + reply = await ws.recv() + if isinstance(reply, bytes): + reply = reply.decode("utf-8") + await self._on_message(relay_url, reply) + except Exception as e: + logger.debug( + f"Nostr transport: recv error on {relay_url}: {e}" + ) + break + + logger.debug(f"Nostr transport: connection to {relay_url} closed") + except Exception as e: + logger.error(f"Nostr transport: error connecting to {relay_url}: {e}") + + self._relay_ws.pop(relay_url, None) + if not self._is_shutting_down(): + logger.debug(f"Nostr transport: reconnecting to {relay_url} in 5s...") + await asyncio.sleep(5) + + async def _on_message(self, relay_url: str, message: str): + """Handle an incoming message from a relay.""" + try: + msg = json.loads(message) + if msg[0] == "EVENT": + await self._on_event(relay_url, msg[2]) + elif msg[0] == "OK": + ok = msg[2] if len(msg) > 2 else True + if not ok: + reason = msg[3] if len(msg) > 3 else "unknown" + logger.warning( + f"Nostr transport: event rejected by {relay_url}: {reason}" + ) + elif msg[0] == "EOSE": + logger.debug(f"Nostr transport: EOSE from {relay_url}") + elif msg[0] == "NOTICE": + logger.info(f"Nostr transport: notice from {relay_url}: {msg[1]}") + elif msg[0] == "CLOSED": + logger.warning( + f"Nostr transport: subscription closed by {relay_url}: " + f"{msg[2] if len(msg) > 2 else 'no reason'}" + ) + except Exception as e: + logger.error(f"Nostr transport: error parsing message from {relay_url}: {e}") + + async def _on_event(self, relay_url: str, event: dict): + """Process an incoming nostr event.""" + event_id = event.get("id", "") + + # Deduplicate + if event_id in self._handled_events: + return + self._handled_events[event_id] = time.time() + + # Validate event kind + if event.get("kind") != EVENT_KIND: + return + + # Verify signature + if not verify_event(event): + logger.warning(f"Nostr transport: invalid signature on event {event_id[:16]}") + return + + sender_pubkey = event.get("pubkey", "") + if not sender_pubkey: + return + + # Decrypt content + try: + plaintext = nip44_decrypt( + event["content"], self.private_key_hex, sender_pubkey + ) + except Exception as e: + logger.warning( + f"Nostr transport: decryption failed for event {event_id[:16]}: {e}" + ) + return + + # Forward to callback + try: + await self.event_callback(sender_pubkey, plaintext) + except Exception as e: + logger.error( + f"Nostr transport: error in event callback for {event_id[:16]}: {e}" + ) + + async def _cleanup_handled_events(self): + """Periodically clean up the deduplication cache.""" + while not self._is_shutting_down(): + await asyncio.sleep(600) # every 10 minutes + now = time.time() + expired = [ + eid + for eid, ts in self._handled_events.items() + if now - ts > _HANDLED_EVENT_TTL + ] + for eid in expired: + del self._handled_events[eid] + if expired: + logger.debug( + f"Nostr transport: cleaned {len(expired)} expired event IDs" + ) diff --git a/lnbits/settings.py b/lnbits/settings.py index df491f35..e123f632 100644 --- a/lnbits/settings.py +++ b/lnbits/settings.py @@ -758,6 +758,15 @@ class NostrAuthSettings(LNbitsSettings): ) +class NostrTransportSettings(LNbitsSettings): + nostr_transport_enabled: bool = Field(default=False) + nostr_transport_relays: list[str] = Field( + default=["wss://relay.damus.io", "wss://relay.primal.net"] + ) + nostr_transport_private_key: str = Field(default="") + nostr_transport_public_key: str = Field(default="") + + class GoogleAuthSettings(LNbitsSettings): google_client_id: str = Field(default="") google_client_secret: str = Field(default="") @@ -879,6 +888,7 @@ class EditableSettings( AuditSettings, AuthSettings, NostrAuthSettings, + NostrTransportSettings, GoogleAuthSettings, GitHubAuthSettings, KeycloakAuthSettings,