refactor: pynostr instead of custom nostr lib (#112)

This commit is contained in:
dni ⚡ 2025-11-20 10:12:18 +01:00 committed by GitHub
commit c4d923e9af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 6 additions and 464 deletions

View file

@ -2,9 +2,9 @@ from datetime import datetime, timezone
from lnbits.db import Database from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash from lnbits.helpers import urlsafe_short_hash
from pynostr.key import PrivateKey
from .models import CreatePayLinkData, LnurlpSettings, PayLink from .models import CreatePayLinkData, LnurlpSettings, PayLink
from .nostr.key import PrivateKey
db = Database("ext_lnurlp") db = Database("ext_lnurlp")

View file

@ -1,7 +1,6 @@
from fastapi import Request from fastapi import Request
from lnurl import encode as lnurl_encode from lnurl import encode as lnurl_encode
from pynostr.key import PrivateKey
from .nostr.key import PrivateKey
def parse_nostr_private_key(key: str) -> PrivateKey: def parse_nostr_private_key(key: str) -> PrivateKey:

View file

@ -2,9 +2,9 @@ from datetime import datetime, timezone
from fastapi import Query from fastapi import Query
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from pynostr.key import PrivateKey
from .helpers import parse_nostr_private_key from .helpers import parse_nostr_private_key
from .nostr.key import PrivateKey
class LnurlpSettings(BaseModel): class LnurlpSettings(BaseModel):

View file

@ -1,150 +0,0 @@
# Copyright (c) 2017, 2020 Pieter Wuille
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Reference implementation for Bech32/Bech32m and segwit addresses."""
from enum import Enum
class Encoding(Enum):
"""Enumeration type to list the various supported encodings."""
BECH32 = 1
BECH32M = 2
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
BECH32M_CONST = 0x2BC830A3
def bech32_polymod(values):
"""Internal function that computes the Bech32 checksum."""
generator = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1FFFFFF) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk
def bech32_hrp_expand(hrp):
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
"""Verify a checksum given HRP and converted data characters."""
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
if const == 1:
return Encoding.BECH32
if const == BECH32M_CONST:
return Encoding.BECH32M
return None
def bech32_create_checksum(hrp, data, spec):
"""Compute the checksum values given HRP and data."""
values = bech32_hrp_expand(hrp) + data
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
polymod = bech32_polymod([*values, 0, 0, 0, 0, 0, 0]) ^ const
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data, spec):
"""Compute a Bech32 string given HRP and data values."""
combined = data + bech32_create_checksum(hrp, data, spec)
return hrp + "1" + "".join([CHARSET[d] for d in combined])
def bech32_decode(bech):
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
if (any(ord(x) < 33 or ord(x) > 126 for x in bech)) or (
bech.lower() != bech and bech.upper() != bech
):
return (None, None, None)
bech = bech.lower()
pos = bech.rfind("1")
if pos < 1 or pos + 7 > len(bech) or len(bech) > 90:
return (None, None, None)
if not all(x in CHARSET for x in bech[pos + 1 :]):
return (None, None, None)
hrp = bech[:pos]
data = [CHARSET.find(x) for x in bech[pos + 1 :]]
spec = bech32_verify_checksum(hrp, data)
if spec is None:
return (None, None, None)
return (hrp, data[:-6], spec)
def convertbits(data, frombits, tobits, pad=True):
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
def decode(hrp, addr):
"""Decode a segwit address."""
hrpgot, data, spec = bech32_decode(addr)
assert data, "Invalid bech32 string"
if hrpgot != hrp:
return (None, None)
decoded = convertbits(data[1:], 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return (None, None)
if data[0] > 16:
return (None, None)
if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32:
return (None, None)
if (data[0] == 0 and spec != Encoding.BECH32) or (
data[0] != 0 and spec != Encoding.BECH32M
):
return (None, None)
return (data[0], decoded)
def encode(hrp, witver, witprog):
"""Encode a segwit address."""
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
bits = convertbits(witprog, 8, 5)
assert bits, "Invalid witness program"
ret = bech32_encode(hrp, [witver, *bits], spec)
if decode(hrp, ret) == (None, None):
return None
return ret

View file

@ -1,132 +0,0 @@
import json
import time
from dataclasses import dataclass, field
from enum import IntEnum
from hashlib import sha256
from secp256k1 import PublicKey
from .message_type import ClientMessageType
class EventKind(IntEnum):
SET_METADATA = 0
TEXT_NOTE = 1
RECOMMEND_RELAY = 2
CONTACTS = 3
ENCRYPTED_DIRECT_MESSAGE = 4
DELETE = 5
@dataclass
class Event:
content: str | None = None
public_key: str | None = None
created_at: int | None = None
kind: int = EventKind.TEXT_NOTE
tags: list[list[str]] = field(
default_factory=list
) # Dataclasses require special handling when the default value is a mutable type
signature: str | None = None
def __post_init__(self):
if self.content is not None and not isinstance(self.content, str):
# DMs initialize content to None but all other kinds should pass in a str
raise TypeError("Argument 'content' must be of type str")
if self.created_at is None:
self.created_at = int(time.time())
@staticmethod
def serialize(
public_key: str, created_at: int, kind: int, tags: list[list[str]], content: str
) -> bytes:
data = [0, public_key, created_at, kind, tags, content]
data_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
return data_str.encode()
@staticmethod
def compute_id(
public_key: str, created_at: int, kind: int, tags: list[list[str]], content: str
):
return sha256(
Event.serialize(public_key, created_at, kind, tags, content)
).hexdigest()
@property
def id(self) -> str:
# Always recompute the id to reflect the up-to-date state of the Event
assert self.public_key, "Event public key is missing"
assert self.created_at, "Event created_at is missing"
assert self.content is not None, "Event content is missing"
return Event.compute_id(
self.public_key, self.created_at, self.kind, self.tags, self.content
)
def add_pubkey_ref(self, pubkey: str):
"""Adds a reference to a pubkey as a 'p' tag"""
self.tags.append(["p", pubkey])
def add_event_ref(self, event_id: str):
"""Adds a reference to an event_id as an 'e' tag"""
self.tags.append(["e", event_id])
def verify(self) -> bool:
assert self.public_key, "Event public key is missing"
pub_key = PublicKey(
bytes.fromhex("02" + self.public_key), True
) # add 02 for schnorr (bip340)
assert self.signature, "Event signature is missing"
return pub_key.schnorr_verify(
bytes.fromhex(self.id), bytes.fromhex(self.signature), None, raw=True
)
def to_message(self) -> str:
return json.dumps(
[
ClientMessageType.EVENT,
{
"id": self.id,
"pubkey": self.public_key,
"created_at": self.created_at,
"kind": self.kind,
"tags": self.tags,
"content": self.content,
"sig": self.signature,
},
]
)
@dataclass
class EncryptedDirectMessage(Event):
recipient_pubkey: str | None = None
cleartext_content: str | None = None
reference_event_id: str | None = None
def __post_init__(self):
if self.content is not None:
self.cleartext_content = self.content
self.content = None
if self.recipient_pubkey is None:
raise Exception("Must specify a recipient_pubkey.")
self.kind = EventKind.ENCRYPTED_DIRECT_MESSAGE
super().__post_init__()
# Must specify the DM recipient's pubkey in a 'p' tag
self.add_pubkey_ref(self.recipient_pubkey)
# Optionally specify a reference event (DM) this is a reply to
if self.reference_event_id is not None:
self.add_event_ref(self.reference_event_id)
@property
def id(self) -> str:
if self.content is None:
raise Exception(
"EncryptedDirectMessage `id` is undefined until its "
"message is encrypted and stored in the `content` field"
)
return super().id

View file

@ -1,155 +0,0 @@
import base64
import secrets
import secp256k1
from cffi import FFI
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from . import bech32
from .event import EncryptedDirectMessage, Event
class PublicKey:
def __init__(self, raw_bytes: bytes) -> None:
self.raw_bytes = raw_bytes
def bech32(self) -> str:
converted_bits = bech32.convertbits(self.raw_bytes, 8, 5)
return bech32.bech32_encode("npub", converted_bits, bech32.Encoding.BECH32)
def hex(self) -> str:
return self.raw_bytes.hex()
def verify_signed_message_hash(self, message_hash: str, sig: str) -> bool:
pk = secp256k1.PublicKey(b"\x02" + self.raw_bytes, True)
return pk.schnorr_verify(
bytes.fromhex(message_hash), bytes.fromhex(sig), None, True
)
@classmethod
def from_npub(cls, npub: str):
"""Load a PublicKey from its bech32/npub form"""
hrp, data, spec = bech32.bech32_decode(npub)
assert data, "Invalid npub"
bits = bech32.convertbits(data, 5, 8)
assert bits, "Invalid npub"
raw_public_key = bits[:-1]
return cls(bytes(raw_public_key))
class PrivateKey:
def __init__(self, raw_secret: bytes | None = None) -> None:
if raw_secret is not None:
self.raw_secret = raw_secret
else:
self.raw_secret = secrets.token_bytes(32)
sk = secp256k1.PrivateKey(self.raw_secret)
assert sk.pubkey, "Invalid public"
self.public_key = PublicKey(sk.pubkey.serialize()[1:])
@classmethod
def from_nsec(cls, nsec: str):
"""Load a PrivateKey from its bech32/nsec form"""
hrp, data, spec = bech32.bech32_decode(nsec)
bits = bech32.convertbits(data, 5, 8)
assert bits, "Invalid nsec"
raw_secret = bits[:-1]
return cls(bytes(raw_secret))
def bech32(self) -> str:
converted_bits = bech32.convertbits(self.raw_secret, 8, 5)
return bech32.bech32_encode("nsec", converted_bits, bech32.Encoding.BECH32)
def hex(self) -> str:
return self.raw_secret.hex()
def tweak_add(self, scalar: bytes) -> bytes:
sk = secp256k1.PrivateKey(self.raw_secret)
return sk.tweak_add(scalar)
def compute_shared_secret(self, public_key_hex: str) -> bytes:
pk = secp256k1.PublicKey(bytes.fromhex("02" + public_key_hex), True)
return pk.ecdh(self.raw_secret, hashfn=copy_x)
def encrypt_message(self, message: str, public_key_hex: str) -> str:
padder = padding.PKCS7(128).padder()
padded_data = padder.update(message.encode()) + padder.finalize()
iv = secrets.token_bytes(16)
cipher = Cipher(
algorithms.AES(self.compute_shared_secret(public_key_hex)), modes.CBC(iv)
)
encryptor = cipher.encryptor()
encrypted_message = encryptor.update(padded_data) + encryptor.finalize()
msg = base64.b64encode(encrypted_message).decode()
return f"{msg}?iv={base64.b64encode(iv).decode()}"
def encrypt_dm(self, dm: EncryptedDirectMessage) -> None:
assert dm.recipient_pubkey, "Recipient public key must be set"
dm.content = self.encrypt_message(
message=dm.cleartext_content or "", public_key_hex=dm.recipient_pubkey
)
def decrypt_message(self, encoded_message: str, public_key_hex: str) -> str:
encoded_data = encoded_message.split("?iv=")
encoded_content, encoded_iv = encoded_data[0], encoded_data[1]
iv = base64.b64decode(encoded_iv)
cipher = Cipher(
algorithms.AES(self.compute_shared_secret(public_key_hex)), modes.CBC(iv)
)
encrypted_content = base64.b64decode(encoded_content)
decryptor = cipher.decryptor()
decrypted_message = decryptor.update(encrypted_content) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_data = unpadder.update(decrypted_message) + unpadder.finalize()
return unpadded_data.decode()
def sign_message_hash(self, message_hash: bytes) -> str:
sk = secp256k1.PrivateKey(self.raw_secret)
sig = sk.schnorr_sign(message_hash, None, raw=True)
return sig.hex()
def sign_event(self, event: Event) -> None:
if event.public_key is None:
event.public_key = self.public_key.hex()
event.signature = self.sign_message_hash(bytes.fromhex(event.id))
def __eq__(self, other):
return self.raw_secret == other.raw_secret
def mine_vanity_key(prefix: str | None = None, suffix: str | None = None) -> PrivateKey:
if prefix is None and suffix is None:
raise ValueError("Expected at least one of 'prefix' or 'suffix' arguments")
while True:
sk = PrivateKey()
if (
prefix is not None
and not sk.public_key.bech32()[5 : 5 + len(prefix)] == prefix
):
continue
if suffix is not None and not sk.public_key.bech32()[-len(suffix) :] == suffix:
continue
break
return sk
ffi = FFI()
@ffi.callback(
"int (unsigned char *, const unsigned char *, const unsigned char *, void *)"
)
def copy_x(output, x32, y32, data):
ffi.memmove(output, x32, 32)
return 1

View file

@ -1,20 +0,0 @@
class ClientMessageType:
EVENT = "EVENT"
REQUEST = "REQ"
CLOSE = "CLOSE"
class RelayMessageType:
EVENT = "EVENT"
NOTICE = "NOTICE"
END_OF_STORED_EVENTS = "EOSE"
@staticmethod
def is_valid(relay_type: str) -> bool:
if (
relay_type == RelayMessageType.EVENT
or relay_type == RelayMessageType.NOTICE
or relay_type == RelayMessageType.END_OF_STORED_EVENTS
):
return True
return False

View file

@ -42,7 +42,7 @@ module = [
"shortuuid.*", "shortuuid.*",
"httpx.*", "httpx.*",
"websocket.*", "websocket.*",
"secp256k1.*", "pynostr.*",
] ]
ignore_missing_imports = "True" ignore_missing_imports = "True"

View file

@ -7,10 +7,10 @@ from lnbits.core.crud import get_payment, update_payment
from lnbits.core.models import Payment from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener from lnbits.tasks import register_invoice_listener
from loguru import logger from loguru import logger
from pynostr.event import Event
from .crud import get_or_create_lnurlp_settings, get_pay_link from .crud import get_or_create_lnurlp_settings, get_pay_link
from .models import PayLink from .models import PayLink
from .nostr.event import Event
async def wait_for_paid_invoices(): async def wait_for_paid_invoices():
@ -136,7 +136,7 @@ async def send_zap(payment: Payment):
) )
settings = await get_or_create_lnurlp_settings() settings = await get_or_create_lnurlp_settings()
settings.private_key.sign_event(zap_receipt) zap_receipt.sign(settings.private_key.hex())
async def send_to_relay(relay_url: str, event_message: str): async def send_to_relay(relay_url: str, event_message: str):
"""Helper function to send an event to a single relay.""" """Helper function to send an event to a single relay."""