feat: add nostr transport layer for HTTP-free API access

Enable LNbits API access over nostr relays using NIP-44 v2 encrypted
kind-21000 events, modeled after Lightning.pub's architecture. This
eliminates the need for port forwarding, DNS, or SSL -- the instance
connects outbound to commodity relays and clients communicate via
encrypted nostr events.

New module: lnbits/core/services/nostr_transport/
- crypto.py: NIP-44 v2 (ECDH + HKDF + ChaCha20 + HMAC-SHA256)
- relay_pool.py: multi-relay WebSocket pool with auto-reconnect
- dispatcher.py: RPC registry with 7 core wallet endpoints
- auth.py: nostr pubkey -> LNbits account/wallet resolution
- models.py: request/response pydantic models

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Patrick Mulligan 2026-04-10 05:40:37 -04:00
parent 266b16834b
commit 5443cb75bf
8 changed files with 935 additions and 0 deletions

View file

@ -482,6 +482,12 @@ def register_async_tasks() -> None:
create_permanent_task(purge_audit_data) create_permanent_task(purge_audit_data)
create_permanent_task(collect_exchange_rates_data) create_permanent_task(collect_exchange_rates_data)
# nostr transport (RPC over nostr relays)
if settings.nostr_transport_enabled:
from lnbits.core.services.nostr_transport import start_nostr_transport
create_permanent_task(start_nostr_transport)
# server logs for websocket # server logs for websocket
if settings.lnbits_admin_ui: if settings.lnbits_admin_ui:
server_log_task = initialize_server_websocket_logger() server_log_task = initialize_server_websocket_logger()

View file

@ -0,0 +1,135 @@
"""
Nostr transport layer for LNbits.
Enables LNbits API access over nostr relays using NIP-44 encrypted
kind-21000 events, modeled after Lightning.pub's architecture.
No port forwarding, DNS, or SSL required.
Usage:
Set these in .env:
NOSTR_TRANSPORT_ENABLED=true
NOSTR_TRANSPORT_PRIVATE_KEY=<hex_private_key>
NOSTR_TRANSPORT_RELAYS=["wss://relay.damus.io"]
"""
import json
from loguru import logger
from lnbits.settings import settings
from lnbits.utils.nostr import generate_keypair, normalize_private_key
from .dispatcher import dispatch, register_default_rpcs
from .models import NostrRpcRequest, NostrRpcResponse
from .relay_pool import NostrTransportPool
_pool: NostrTransportPool | None = None
async def start_nostr_transport():
"""
Main entry point. Called from app.py via create_permanent_task().
Initializes the relay pool, registers RPC endpoints, and starts
listening for incoming nostr events.
"""
global _pool
if not settings.nostr_transport_enabled:
return
# Resolve keypair
private_key_hex = settings.nostr_transport_private_key
if not private_key_hex:
logger.info("Nostr transport: no private key configured, generating one...")
private_key_hex, public_key_hex = generate_keypair()
settings.nostr_transport_private_key = private_key_hex
settings.nostr_transport_public_key = public_key_hex
logger.info(
f"Nostr transport: generated keypair. "
f"Public key (share this): {public_key_hex}"
)
else:
private_key_hex = normalize_private_key(private_key_hex)
# Derive public key from private key
import coincurve
privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex))
public_key_hex = privkey.public_key_xonly.format().hex()
settings.nostr_transport_public_key = public_key_hex
relay_urls = settings.nostr_transport_relays
if not relay_urls:
logger.warning("Nostr transport: no relays configured, disabling")
return
logger.info(
f"Nostr transport: starting with pubkey {public_key_hex[:16]}... "
f"on {len(relay_urls)} relay(s)"
)
# Register RPC endpoints
register_default_rpcs()
# Create and start relay pool
_pool = NostrTransportPool(
private_key_hex=private_key_hex,
public_key_hex=public_key_hex,
relay_urls=relay_urls,
event_callback=_handle_event,
)
await _pool.start()
# Keep alive until shutdown
while not _pool._is_shutting_down():
import asyncio
await asyncio.sleep(1)
await stop_nostr_transport()
async def stop_nostr_transport():
"""Clean shutdown of the nostr transport."""
global _pool
if _pool:
await _pool.stop()
_pool = None
logger.info("Nostr transport: stopped")
async def _handle_event(sender_pubkey: str, plaintext: str):
"""
Handle a decrypted incoming event.
Parse as RPC request, dispatch, and send response.
"""
try:
data = json.loads(plaintext)
except json.JSONDecodeError:
logger.warning("Nostr transport: received non-JSON event content")
return
# Parse request
try:
request = NostrRpcRequest(**data)
except Exception as e:
logger.warning(f"Nostr transport: invalid RPC request: {e}")
# Try to send error response if we have a request_id
request_id = data.get("request_id", "unknown")
response = NostrRpcResponse(
status="ERROR",
request_id=request_id,
error=f"Invalid request: {e}",
)
if _pool:
await _pool.send_response(sender_pubkey, response.json())
return
# Validate authIdentifier matches sender (Lightning.pub pattern)
# In our case, the sender pubkey from the event IS the auth identity
# Dispatch
response = await dispatch(request, sender_pubkey)
# Send response
if _pool:
await _pool.send_response(sender_pubkey, response.json())

View file

@ -0,0 +1,78 @@
"""
Nostr pubkey -> LNbits account/wallet authorization.
Maps a nostr event sender's pubkey to the appropriate LNbits
authorization context, following the same patterns as the HTTP
decorators in lnbits/decorators.py.
"""
from uuid import uuid4
from loguru import logger
from lnbits.core.crud.users import get_account_by_pubkey
from lnbits.core.crud.wallets import get_wallet
from lnbits.core.models import Account, UserExtra
from lnbits.core.models.wallets import KeyType, WalletTypeInfo
from lnbits.core.services.users import create_user_account
from lnbits.settings import settings
async def resolve_nostr_auth(
pubkey: str,
wallet_id: str | None = None,
key_type_str: str | None = None,
) -> WalletTypeInfo | Account | None:
"""
Resolve a nostr pubkey + optional wallet_id + key_type to an
LNbits authorization context.
Returns:
WalletTypeInfo - for wallet-scoped operations (payments, etc.)
Account - for account-level operations (create wallet, etc.)
None - for public endpoints (decode, payment status)
"""
if wallet_id is None and key_type_str is None:
# Public endpoint -- no auth needed, but still resolve account
# if pubkey is provided (for audit/logging)
account = await get_account_by_pubkey(pubkey)
if account:
return account
return None
# Resolve account from pubkey
account = await get_account_by_pubkey(pubkey)
if not account:
if not settings.new_accounts_allowed:
raise PermissionError("Account creation is disabled.")
# Auto-create account (same as nostr_login in auth_api.py)
account = Account(
id=uuid4().hex,
pubkey=pubkey,
extra=UserExtra(provider="nostr"),
)
user = await create_user_account(account)
logger.info(f"Nostr transport: auto-created account for {pubkey[:16]}...")
account = Account(
id=user.id,
pubkey=pubkey,
extra=UserExtra(provider="nostr"),
)
if not account.activated:
raise PermissionError("Account is not activated.")
# Account-level operation (no wallet specified)
if wallet_id is None:
return account
# Wallet-scoped operation
wallet = await get_wallet(wallet_id)
if not wallet:
raise ValueError(f"Wallet not found: {wallet_id}")
if wallet.user != account.id:
raise PermissionError("Wallet does not belong to this account.")
key_type = KeyType.admin if key_type_str == "admin" else KeyType.invoice
return WalletTypeInfo(key_type=key_type, wallet=wallet)

View file

@ -0,0 +1,187 @@
"""
NIP-44 v2 encryption/decryption for nostr transport.
Implements the NIP-44 v2 spec using coincurve (secp256k1 ECDH) and
pycryptodome (ChaCha20, HMAC-SHA256, HKDF), all already in LNbits' deps.
Also re-exports sign_event() and verify_event() from lnbits.utils.nostr.
"""
import base64
import hashlib
import hmac
import os
import struct
from math import floor, log2
import coincurve
from Cryptodome.Cipher import ChaCha20
from lnbits.utils.nostr import sign_event, verify_event # noqa: F401
_NIP44_SALT = b"nip44-v2"
_VERSION = 2
def get_conversation_key(
private_key_hex: str, public_key_hex: str
) -> bytes:
"""
Compute NIP-44 v2 conversation key via ECDH + HKDF-extract.
The public key must be 32-byte x-only (as in nostr). We prepend 0x02
to make it a valid compressed SEC1 point for coincurve.
"""
privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex))
# x-only pubkey -> compressed pubkey (assume even y)
pubkey_bytes = b"\x02" + bytes.fromhex(public_key_hex)
pubkey = coincurve.PublicKey(pubkey_bytes)
# ECDH: multiply pubkey by privkey, get raw x coordinate (32 bytes)
# coincurve's ecdh returns sha256(compressed_point) by default,
# but NIP-44 needs the unhashed x coordinate.
shared_point = pubkey.multiply(privkey.secret)
shared_x = shared_point.format(compressed=True)[1:] # strip 0x02/0x03 prefix
# HKDF-extract with salt="nip44-v2"
return _hkdf_extract(salt=_NIP44_SALT, ikm=shared_x)
def nip44_encrypt(
plaintext: str, sender_privkey_hex: str, recipient_pubkey_hex: str
) -> str:
"""Encrypt a plaintext string using NIP-44 v2."""
conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex)
nonce = os.urandom(32)
return _encrypt(plaintext, conversation_key, nonce)
def nip44_decrypt(
payload: str, recipient_privkey_hex: str, sender_pubkey_hex: str
) -> str:
"""Decrypt a NIP-44 v2 payload."""
conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex)
return _decrypt(payload, conversation_key)
# --- Internal implementation ---
def _hkdf_extract(salt: bytes, ikm: bytes) -> bytes:
"""HKDF-extract (RFC 5869) with SHA-256."""
return hmac.new(salt, ikm, hashlib.sha256).digest()
def _hkdf_expand(prk: bytes, info: bytes, length: int) -> bytes:
"""HKDF-expand (RFC 5869) with SHA-256."""
hash_len = 32 # SHA-256
n = (length + hash_len - 1) // hash_len
okm = b""
t = b""
for i in range(1, n + 1):
t = hmac.new(prk, t + info + bytes([i]), hashlib.sha256).digest()
okm += t
return okm[:length]
def _calc_padded_len(unpadded_len: int) -> int:
"""NIP-44 v2 padding length calculation."""
if unpadded_len <= 32:
return 32
next_power = 1 << (floor(log2(unpadded_len - 1)) + 1)
if next_power <= 256:
chunk = 32
else:
chunk = next_power // 8
return chunk * (floor((unpadded_len - 1) / chunk) + 1)
def _pad(plaintext: str) -> bytes:
"""Convert plaintext to NIP-44 v2 padded byte array."""
unpadded = plaintext.encode("utf-8")
unpadded_len = len(unpadded)
if unpadded_len < 1 or unpadded_len > 65535:
raise ValueError(f"Invalid plaintext length: {unpadded_len}")
padded_len = _calc_padded_len(unpadded_len)
prefix = struct.pack(">H", unpadded_len) # big-endian uint16
suffix = b"\x00" * (padded_len - unpadded_len)
return prefix + unpadded + suffix
def _unpad(padded: bytes) -> str:
"""Convert NIP-44 v2 padded byte array back to plaintext."""
unpadded_len = struct.unpack(">H", padded[:2])[0]
if unpadded_len == 0:
raise ValueError("Invalid padding: zero length")
unpadded = padded[2 : 2 + unpadded_len]
if len(unpadded) != unpadded_len:
raise ValueError("Invalid padding: length mismatch")
expected_padded_total = 2 + _calc_padded_len(unpadded_len)
if len(padded) != expected_padded_total:
raise ValueError("Invalid padding: wrong padded size")
return unpadded.decode("utf-8")
def _get_message_keys(conversation_key: bytes, nonce: bytes) -> tuple[bytes, bytes, bytes]:
"""Derive chacha_key, chacha_nonce, hmac_key from conversation_key and nonce."""
keys = _hkdf_expand(conversation_key, nonce, 76)
chacha_key = keys[0:32]
chacha_nonce = keys[32:44]
hmac_key = keys[44:76]
return chacha_key, chacha_nonce, hmac_key
def _encrypt(plaintext: str, conversation_key: bytes, nonce: bytes) -> str:
"""Encrypt using NIP-44 v2."""
chacha_key, chacha_nonce, hmac_key = _get_message_keys(conversation_key, nonce)
padded = _pad(plaintext)
# ChaCha20 encrypt (RFC 8439, counter=0)
cipher = ChaCha20.new(key=chacha_key, nonce=chacha_nonce)
ciphertext = cipher.encrypt(padded)
# HMAC-SHA256 over nonce + ciphertext
mac = hmac.new(hmac_key, nonce + ciphertext, hashlib.sha256).digest()
# Assemble: version(1) + nonce(32) + ciphertext(variable) + mac(32)
payload = bytes([_VERSION]) + nonce + ciphertext + mac
return base64.b64encode(payload).decode("ascii")
def _decrypt(payload_b64: str, conversation_key: bytes) -> str:
"""Decrypt a NIP-44 v2 payload."""
if not payload_b64:
raise ValueError("Empty payload")
if payload_b64[0] == "#":
raise ValueError("Unknown encoding version")
plen = len(payload_b64)
if plen < 132 or plen > 87472:
raise ValueError(f"Invalid payload size: {plen}")
data = base64.b64decode(payload_b64)
dlen = len(data)
if dlen < 99 or dlen > 65603:
raise ValueError(f"Invalid data size: {dlen}")
version = data[0]
if version != _VERSION:
raise ValueError(f"Unknown version: {version}")
nonce = data[1:33]
ciphertext = data[33 : dlen - 32]
mac = data[dlen - 32 : dlen]
# Derive keys
chacha_key, chacha_nonce, hmac_key = _get_message_keys(conversation_key, nonce)
# Verify MAC (constant-time)
expected_mac = hmac.new(hmac_key, nonce + ciphertext, hashlib.sha256).digest()
if not hmac.compare_digest(mac, expected_mac):
raise ValueError("Invalid MAC")
# Decrypt
cipher = ChaCha20.new(key=chacha_key, nonce=chacha_nonce)
padded = cipher.decrypt(ciphertext)
return _unpad(padded)

View file

@ -0,0 +1,249 @@
"""
RPC dispatcher for the nostr transport.
Maps rpc_name strings to LNbits service functions, handles auth
resolution, and returns serialized responses. Modeled after
Lightning.pub's nostr_transport.ts auto-generated dispatcher.
"""
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any
from loguru import logger
from lnbits.core.models import Account
from lnbits.core.models.wallets import KeyType, WalletTypeInfo
from .auth import resolve_nostr_auth
from .models import NostrRpcRequest, NostrRpcResponse
# Auth level constants
AUTH_NONE = "none"
AUTH_ACCOUNT = "account"
AUTH_INVOICE_KEY = "invoice_key"
AUTH_ADMIN_KEY = "admin_key"
@dataclass
class RpcEndpoint:
handler: Callable[..., Awaitable[Any]]
auth_level: str # AUTH_NONE, AUTH_ACCOUNT, AUTH_INVOICE_KEY, AUTH_ADMIN_KEY
# Global RPC registry
_RPC_REGISTRY: dict[str, RpcEndpoint] = {}
def register_rpc(
name: str,
handler: Callable[..., Awaitable[Any]],
auth_level: str = AUTH_INVOICE_KEY,
):
"""Register an RPC handler."""
_RPC_REGISTRY[name] = RpcEndpoint(handler=handler, auth_level=auth_level)
async def dispatch(request: NostrRpcRequest, sender_pubkey: str) -> NostrRpcResponse:
"""
Route an RPC request to the appropriate handler.
1. Look up rpc_name in registry
2. Resolve auth from sender pubkey
3. Validate auth level
4. Call handler
5. Return response
"""
endpoint = _RPC_REGISTRY.get(request.rpc_name)
if not endpoint:
return NostrRpcResponse(
status="ERROR",
request_id=request.request_id,
error=f"Unknown RPC: {request.rpc_name}",
)
try:
# Resolve auth
auth_ctx = await _resolve_auth(
endpoint.auth_level, sender_pubkey, request.wallet_id, request.key_type
)
# Call handler
result = await endpoint.handler(auth_ctx, request)
return NostrRpcResponse(
status="OK",
request_id=request.request_id,
data=result,
)
except (PermissionError, ValueError) as e:
return NostrRpcResponse(
status="ERROR",
request_id=request.request_id,
error=str(e),
)
except Exception as e:
logger.error(f"Nostr transport: RPC error in {request.rpc_name}: {e}")
return NostrRpcResponse(
status="ERROR",
request_id=request.request_id,
error="Internal error",
)
async def _resolve_auth(
auth_level: str,
sender_pubkey: str,
wallet_id: str | None,
key_type_str: str | None,
) -> WalletTypeInfo | Account | None:
"""Resolve and validate auth for the given level."""
if auth_level == AUTH_NONE:
return None
auth_ctx = await resolve_nostr_auth(sender_pubkey, wallet_id, key_type_str)
if auth_level == AUTH_ACCOUNT:
if auth_ctx is None:
raise PermissionError("Authentication required.")
return auth_ctx
if auth_level in (AUTH_INVOICE_KEY, AUTH_ADMIN_KEY):
if not isinstance(auth_ctx, WalletTypeInfo):
raise PermissionError("Wallet authentication required (provide wallet_id).")
if auth_level == AUTH_ADMIN_KEY and auth_ctx.key_type != KeyType.admin:
raise PermissionError("Admin key required for this operation.")
return auth_ctx
raise PermissionError(f"Unknown auth level: {auth_level}")
# --- Default RPC handlers ---
def register_default_rpcs():
"""Register the core LNbits wallet RPCs."""
register_rpc("create_invoice", _handle_create_invoice, AUTH_INVOICE_KEY)
register_rpc("pay_invoice", _handle_pay_invoice, AUTH_ADMIN_KEY)
register_rpc("get_payment", _handle_get_payment, AUTH_NONE)
register_rpc("list_payments", _handle_list_payments, AUTH_INVOICE_KEY)
register_rpc("get_wallet", _handle_get_wallet, AUTH_INVOICE_KEY)
register_rpc("create_wallet", _handle_create_wallet, AUTH_ACCOUNT)
register_rpc("decode_payment", _handle_decode_payment, AUTH_NONE)
async def _handle_create_invoice(
auth: WalletTypeInfo, request: NostrRpcRequest
) -> dict:
from lnbits.core.models import CreateInvoice
from lnbits.core.services.payments import create_payment_request
body = request.body or {}
invoice_data = CreateInvoice(
out=False,
amount=body.get("amount", 0),
memo=body.get("memo"),
unit=body.get("unit", "sat"),
expiry=body.get("expiry"),
extra=body.get("extra"),
webhook=body.get("webhook"),
)
payment = await create_payment_request(auth.wallet.id, invoice_data)
return payment.dict()
async def _handle_pay_invoice(auth: WalletTypeInfo, request: NostrRpcRequest) -> dict:
from lnbits.core.services.payments import pay_invoice
body = request.body or {}
bolt11 = body.get("bolt11", "")
if not bolt11:
raise ValueError("bolt11 is required")
payment = await pay_invoice(
wallet_id=auth.wallet.id,
payment_request=bolt11,
max_sat=body.get("max_sat"),
extra=body.get("extra"),
description=body.get("description", ""),
)
return payment.dict()
async def _handle_get_payment(
auth: None, request: NostrRpcRequest
) -> dict | None:
from lnbits.core.crud.payments import get_standalone_payment
body = request.body or {}
query = request.query or {}
payment_hash = body.get("payment_hash") or query.get("payment_hash", "")
if not payment_hash:
raise ValueError("payment_hash is required")
payment = await get_standalone_payment(payment_hash)
if not payment:
return None
return payment.dict()
async def _handle_list_payments(
auth: WalletTypeInfo, request: NostrRpcRequest
) -> list[dict]:
from lnbits.core.crud.payments import get_payments
query = request.query or {}
payments = await get_payments(
wallet_id=auth.wallet.id,
complete=query.get("complete", False),
pending=query.get("pending", False),
outgoing=query.get("outgoing", False),
incoming=query.get("incoming", False),
limit=query.get("limit"),
offset=query.get("offset"),
)
return [p.dict() for p in payments]
async def _handle_get_wallet(auth: WalletTypeInfo, request: NostrRpcRequest) -> dict:
return {
"id": auth.wallet.id,
"name": auth.wallet.name,
"balance": auth.wallet.balance_msat,
}
async def _handle_create_wallet(auth: Account, request: NostrRpcRequest) -> dict:
from lnbits.core.crud.wallets import create_wallet
body = request.body or {}
wallet = await create_wallet(
user_id=auth.id,
wallet_name=body.get("name", "Nostr Wallet"),
)
return {
"id": wallet.id,
"name": wallet.name,
"adminkey": wallet.adminkey,
"inkey": wallet.inkey,
}
async def _handle_decode_payment(auth: None, request: NostrRpcRequest) -> dict:
from lnbits import bolt11
body = request.body or {}
payment_request = body.get("payment_request", "")
if not payment_request:
raise ValueError("payment_request is required")
invoice = bolt11.decode(payment_request)
return {
"payment_hash": invoice.payment_hash,
"amount_msat": invoice.amount_msat,
"description": invoice.description,
"description_hash": invoice.description_hash,
"payee": invoice.payee,
"date": invoice.date,
"expiry": invoice.expiry,
"min_final_cltv_expiry": invoice.min_final_cltv_expiry,
}

View file

@ -0,0 +1,29 @@
from typing import Any
from pydantic import BaseModel
class NostrRpcRequest(BaseModel):
"""
Incoming RPC request over nostr transport.
Mirrors Lightning.pub's NostrRequest type from nostr_transport.ts.
"""
rpc_name: str
wallet_id: str | None = None
key_type: str | None = None # "admin" or "invoice"
request_id: str
body: dict | None = None
query: dict | None = None
class NostrRpcResponse(BaseModel):
"""
Outgoing RPC response over nostr transport.
Mirrors Lightning.pub's response format: {status, requestId, ...data}.
"""
status: str # "OK" or "ERROR"
request_id: str
data: Any | None = None
error: str | None = None

View file

@ -0,0 +1,241 @@
"""
Nostr relay connection pool for the transport layer.
Manages WebSocket connections to multiple nostr relays, subscribes for
incoming RPC events (kind 21000) tagged with the LNbits instance's
pubkey, validates and decrypts them, and publishes encrypted responses.
Follows the reconnection pattern from lnbits/wallets/nwc.py and the
relay pool design from Lightning.pub's nostrPool.ts.
"""
import asyncio
import json
import time
from collections.abc import Awaitable, Callable
import coincurve
from loguru import logger
from websockets import connect as ws_connect
from lnbits.settings import settings
from lnbits.utils.nostr import json_dumps, sign_event, verify_event
from .crypto import nip44_decrypt, nip44_encrypt
EVENT_KIND = 21000
# Events older than this are deduplication-expired (seconds)
_HANDLED_EVENT_TTL = 7200 # 2 hours
class NostrTransportPool:
"""
Manages relay connections for the nostr transport layer.
Subscribes for kind-21000 events tagged with our pubkey,
decrypts them, and forwards to a callback. Publishes encrypted
response events back to the sender.
"""
def __init__(
self,
private_key_hex: str,
public_key_hex: str,
relay_urls: list[str],
event_callback: Callable[..., Awaitable[None]],
):
self.private_key_hex = private_key_hex
self.public_key_hex = public_key_hex
self.relay_urls = relay_urls
self.event_callback = event_callback
# coincurve key objects for signing
self._privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex))
# Deduplication: event_id -> timestamp
self._handled_events: dict[str, float] = {}
# Active relay tasks and websockets
self._relay_tasks: list[asyncio.Task] = []
self._relay_ws: dict[str, object] = {} # relay_url -> ws
self._shutdown = False
def _is_shutting_down(self) -> bool:
return self._shutdown or not settings.lnbits_running
async def start(self):
"""Connect to all configured relays."""
logger.info(
f"Nostr transport: connecting to {len(self.relay_urls)} relay(s) "
f"as {self.public_key_hex[:16]}..."
)
for relay_url in self.relay_urls:
task = asyncio.create_task(self._connect_to_relay(relay_url))
self._relay_tasks.append(task)
# Periodic cleanup of handled events cache
self._cleanup_task = asyncio.create_task(self._cleanup_handled_events())
async def stop(self):
"""Shut down all relay connections."""
self._shutdown = True
for task in self._relay_tasks:
task.cancel()
self._cleanup_task.cancel()
# Close websockets
for url, ws in self._relay_ws.items():
try:
await ws.close()
except Exception:
pass
self._relay_ws.clear()
logger.info("Nostr transport: all relays disconnected")
async def send_response(self, recipient_pubkey: str, content: str):
"""Encrypt and publish a response event to the sender."""
encrypted = nip44_encrypt(content, self.private_key_hex, recipient_pubkey)
event = {
"kind": EVENT_KIND,
"created_at": int(time.time()),
"tags": [["p", recipient_pubkey]],
"content": encrypted,
}
event = sign_event(event, self.public_key_hex, self._privkey)
event_json = json.dumps(["EVENT", event])
# Publish to all connected relays
sent = False
for url, ws in list(self._relay_ws.items()):
try:
await ws.send(event_json)
sent = True
except Exception as e:
logger.warning(f"Nostr transport: failed to send to {url}: {e}")
if not sent:
logger.error("Nostr transport: could not send response to any relay")
# --- Internal ---
async def _connect_to_relay(self, relay_url: str):
"""Connect to a single relay with auto-reconnect."""
while not self._is_shutting_down():
try:
async with ws_connect(relay_url) as ws:
self._relay_ws[relay_url] = ws
logger.info(f"Nostr transport: connected to {relay_url}")
# Subscribe for events tagged with our pubkey
sub_id = f"lnbits_transport_{self.public_key_hex[:8]}"
sub_filter = {
"kinds": [EVENT_KIND],
"#p": [self.public_key_hex],
"since": int(time.time()),
}
await ws.send(json.dumps(["REQ", sub_id, sub_filter]))
# Receive loop
while not self._is_shutting_down():
try:
reply = await ws.recv()
if isinstance(reply, bytes):
reply = reply.decode("utf-8")
await self._on_message(relay_url, reply)
except Exception as e:
logger.debug(
f"Nostr transport: recv error on {relay_url}: {e}"
)
break
logger.debug(f"Nostr transport: connection to {relay_url} closed")
except Exception as e:
logger.error(f"Nostr transport: error connecting to {relay_url}: {e}")
self._relay_ws.pop(relay_url, None)
if not self._is_shutting_down():
logger.debug(f"Nostr transport: reconnecting to {relay_url} in 5s...")
await asyncio.sleep(5)
async def _on_message(self, relay_url: str, message: str):
"""Handle an incoming message from a relay."""
try:
msg = json.loads(message)
if msg[0] == "EVENT":
await self._on_event(relay_url, msg[2])
elif msg[0] == "OK":
ok = msg[2] if len(msg) > 2 else True
if not ok:
reason = msg[3] if len(msg) > 3 else "unknown"
logger.warning(
f"Nostr transport: event rejected by {relay_url}: {reason}"
)
elif msg[0] == "EOSE":
logger.debug(f"Nostr transport: EOSE from {relay_url}")
elif msg[0] == "NOTICE":
logger.info(f"Nostr transport: notice from {relay_url}: {msg[1]}")
elif msg[0] == "CLOSED":
logger.warning(
f"Nostr transport: subscription closed by {relay_url}: "
f"{msg[2] if len(msg) > 2 else 'no reason'}"
)
except Exception as e:
logger.error(f"Nostr transport: error parsing message from {relay_url}: {e}")
async def _on_event(self, relay_url: str, event: dict):
"""Process an incoming nostr event."""
event_id = event.get("id", "")
# Deduplicate
if event_id in self._handled_events:
return
self._handled_events[event_id] = time.time()
# Validate event kind
if event.get("kind") != EVENT_KIND:
return
# Verify signature
if not verify_event(event):
logger.warning(f"Nostr transport: invalid signature on event {event_id[:16]}")
return
sender_pubkey = event.get("pubkey", "")
if not sender_pubkey:
return
# Decrypt content
try:
plaintext = nip44_decrypt(
event["content"], self.private_key_hex, sender_pubkey
)
except Exception as e:
logger.warning(
f"Nostr transport: decryption failed for event {event_id[:16]}: {e}"
)
return
# Forward to callback
try:
await self.event_callback(sender_pubkey, plaintext)
except Exception as e:
logger.error(
f"Nostr transport: error in event callback for {event_id[:16]}: {e}"
)
async def _cleanup_handled_events(self):
"""Periodically clean up the deduplication cache."""
while not self._is_shutting_down():
await asyncio.sleep(600) # every 10 minutes
now = time.time()
expired = [
eid
for eid, ts in self._handled_events.items()
if now - ts > _HANDLED_EVENT_TTL
]
for eid in expired:
del self._handled_events[eid]
if expired:
logger.debug(
f"Nostr transport: cleaned {len(expired)} expired event IDs"
)

View file

@ -758,6 +758,15 @@ class NostrAuthSettings(LNbitsSettings):
) )
class NostrTransportSettings(LNbitsSettings):
nostr_transport_enabled: bool = Field(default=False)
nostr_transport_relays: list[str] = Field(
default=["wss://relay.damus.io", "wss://relay.primal.net"]
)
nostr_transport_private_key: str = Field(default="")
nostr_transport_public_key: str = Field(default="")
class GoogleAuthSettings(LNbitsSettings): class GoogleAuthSettings(LNbitsSettings):
google_client_id: str = Field(default="") google_client_id: str = Field(default="")
google_client_secret: str = Field(default="") google_client_secret: str = Field(default="")
@ -879,6 +888,7 @@ class EditableSettings(
AuditSettings, AuditSettings,
AuthSettings, AuthSettings,
NostrAuthSettings, NostrAuthSettings,
NostrTransportSettings,
GoogleAuthSettings, GoogleAuthSettings,
GitHubAuthSettings, GitHubAuthSettings,
KeycloakAuthSettings, KeycloakAuthSettings,