From 39ca9da87027f026baf977f4abed3812e00b1f67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?dni=20=E2=9A=A1?= Date: Thu, 30 Oct 2025 08:06:31 +0100 Subject: [PATCH] feat: NWC use coincurve instead of secp (#3455) --- lnbits/utils/nostr.py | 36 ++++++++++++++----------------- lnbits/wallets/nwc.py | 10 ++++----- tests/api/test_auth.py | 26 +++++++++++----------- tests/wallets/test_nwc_wallets.py | 14 ++++++------ 4 files changed, 41 insertions(+), 45 deletions(-) diff --git a/lnbits/utils/nostr.py b/lnbits/utils/nostr.py index f9bfde75..c9111179 100644 --- a/lnbits/utils/nostr.py +++ b/lnbits/utils/nostr.py @@ -4,7 +4,7 @@ import json import re from urllib.parse import urlparse -import secp256k1 +import coincurve from bech32 import bech32_decode, bech32_encode, convertbits from Cryptodome import Random from Cryptodome.Cipher import AES @@ -19,22 +19,22 @@ def generate_keypair() -> tuple[str, str]: def encrypt_content( - content: str, service_pubkey: secp256k1.PublicKey, account_private_key_hex: str + content: str, service_pubkey: coincurve.PublicKey, account_private_key_hex: str ) -> str: """ Encrypts the content to be sent to the service. Args: content (str): The content to be encrypted. - service_pubkey (secp256k1.PublicKey): The service provider's public key. + service_pubkey (coincurve.PublicKey): The service provider's public key. account_private_key_hex (str): The account private key in hex format. Returns: str: The encrypted content. """ - shared = service_pubkey.tweak_mul( - bytes.fromhex(account_private_key_hex) - ).serialize()[1:] + shared = service_pubkey.multiply(bytes.fromhex(account_private_key_hex)).format()[ + 1: + ] # random iv (16B) iv = Random.new().read(AES.block_size) aes = AES.new(shared, AES.MODE_CBC, iv) @@ -52,22 +52,22 @@ def encrypt_content( def decrypt_content( - content: str, service_pubkey: secp256k1.PublicKey, account_private_key_hex: str + content: str, service_pubkey: coincurve.PublicKey, account_private_key_hex: str ) -> str: """ Decrypts the content coming from the service. Args: content (str): The encrypted content. - service_pubkey (secp256k1.PublicKey): The service provider's public key. + service_pubkey (coincurve.PublicKey): The service provider's public key. account_private_key_hex (str): The account private key in hex format. Returns: str: The decrypted content. """ - shared = service_pubkey.tweak_mul( - bytes.fromhex(account_private_key_hex) - ).serialize()[1:] + shared = service_pubkey.multiply(bytes.fromhex(account_private_key_hex)).format()[ + 1: + ] # extract iv and content (encrypted_content_b64, iv_b64) = content.split("?iv=") encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii")) @@ -105,16 +105,14 @@ def verify_event(event: dict) -> bool: if event_id != event["id"]: return False pubkey_hex = event["pubkey"] - pubkey = secp256k1.PublicKey(bytes.fromhex("02" + pubkey_hex), True) - if not pubkey.schnorr_verify( - bytes.fromhex(event_id), bytes.fromhex(event["sig"]), None, raw=True - ): + pubkey = coincurve.PublicKeyXOnly(bytes.fromhex(pubkey_hex)) + if not pubkey.verify(bytes.fromhex(event["sig"]), bytes.fromhex(event_id)): return False return True def sign_event( - event: dict, account_public_key_hex: str, account_private_key: secp256k1.PrivateKey + event: dict, account_public_key_hex: str, account_private_key: coincurve.PrivateKey ) -> dict: """ Signs the event (in place) with the service secret @@ -122,7 +120,7 @@ def sign_event( Args: event (Dict): The event to be signed. account_public_key_hex (str): The account public key in hex format. - account_private_key (secp256k1.PrivateKey): The account private key. + account_private_key (coincurve.PrivateKey): The account private key. Returns: Dict: The input event with the signature added. @@ -141,9 +139,7 @@ def sign_event( event["id"] = event_id event["pubkey"] = account_public_key_hex - signature = ( - account_private_key.schnorr_sign(bytes.fromhex(event_id), None, raw=True) - ).hex() + signature = account_private_key.sign_schnorr(bytes.fromhex(event_id)).hex() event["sig"] = signature return event diff --git a/lnbits/wallets/nwc.py b/lnbits/wallets/nwc.py index 236f419a..c73ba668 100644 --- a/lnbits/wallets/nwc.py +++ b/lnbits/wallets/nwc.py @@ -7,8 +7,8 @@ from collections.abc import AsyncGenerator from typing import cast from urllib.parse import parse_qs, unquote, urlparse -import secp256k1 from bolt11 import decode as bolt11_decode +from coincurve import PrivateKey, PublicKey from loguru import logger from websockets import connect as ws_connect @@ -306,15 +306,15 @@ class NWCConnection: # Parse pairing url (if invalid an exception is raised) # Extract keys (used to sign nwc events+identify NWC user) - self.account_private_key = secp256k1.PrivateKey(bytes.fromhex(secret)) + self.account_private_key = PrivateKey(bytes.fromhex(secret)) self.account_private_key_hex = secret - self.account_public_key = self.account_private_key.pubkey + self.account_public_key = self.account_private_key.public_key if not self.account_public_key: raise ValueError("Missing account public key") - self.account_public_key_hex = self.account_public_key.serialize().hex()[2:] + self.account_public_key_hex = self.account_public_key.format().hex()[2:] # Extract service key (used for encryption to identify the nwc service provider) - self.service_pubkey = secp256k1.PublicKey(bytes.fromhex("02" + pubkey), True) + self.service_pubkey = PublicKey(bytes.fromhex("02" + pubkey)) self.service_pubkey_hex = pubkey # Extract relay url diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 7edbd372..58f5e1a3 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -6,8 +6,8 @@ from uuid import uuid4 import jwt import pytest -import secp256k1 import shortuuid +from coincurve import PrivateKey from httpx import AsyncClient from lnbits.core.crud.users import ( @@ -42,11 +42,11 @@ nostr_event = { "sig": "fb7eb47fa8355747f6837e55620103d73ba47b2c3164ab8319d2f164022a9f25" "6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138", } -private_key = secp256k1.PrivateKey( +private_key = PrivateKey( bytes.fromhex("6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138") ) -assert private_key.pubkey, "Pubkey not created." -pubkey_hex = private_key.pubkey.serialize().hex()[2:] +assert private_key.public_key, "Pubkey not created." +pubkey_hex = private_key.public_key.format().hex()[2:] ################################ LOGIN ################################ @@ -552,9 +552,9 @@ async def test_register_nostr_ok(http_client: AsyncClient, settings: Settings): event = {**nostr_event} event["created_at"] = int(time.time()) - private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) - assert private_key.pubkey, "Pubkey not created." - pubkey_hex = private_key.pubkey.serialize().hex()[2:] + private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex())) + assert private_key.public_key, "Pubkey not created." + pubkey_hex = private_key.public_key.format().hex()[2:] event_signed = sign_event(event, pubkey_hex, private_key) base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii") response = await http_client.post( @@ -759,9 +759,9 @@ async def test_change_pubkey_npub_ok(http_client: AsyncClient, settings: Setting payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"]) access_token_payload = AccessTokenPayload(**payload) - private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) - assert private_key.pubkey, "Pubkey not created." - pubkey_hex = private_key.pubkey.serialize().hex()[2:] + private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex())) + assert private_key.public_key, "Pubkey not created." + pubkey_hex = private_key.public_key.format().hex()[2:] npub = hex_to_npub(pubkey_hex) response = await http_client.put( @@ -802,9 +802,9 @@ async def test_change_pubkey_ok( payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"]) access_token_payload = AccessTokenPayload(**payload) - private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) - assert private_key.pubkey, "Pubkey not created." - pubkey_hex = private_key.pubkey.serialize().hex()[2:] + private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex())) + assert private_key.public_key, "Pubkey not created." + pubkey_hex = private_key.public_key.format().hex()[2:] response = await http_client.put( "/api/v1/auth/pubkey", diff --git a/tests/wallets/test_nwc_wallets.py b/tests/wallets/test_nwc_wallets.py index cea4ce7f..f4310540 100644 --- a/tests/wallets/test_nwc_wallets.py +++ b/tests/wallets/test_nwc_wallets.py @@ -5,7 +5,7 @@ import time from typing import cast import pytest -import secp256k1 +from coincurve import PrivateKey, PublicKey from Cryptodome import Random from Cryptodome.Cipher import AES from Cryptodome.Util.Padding import pad, unpad @@ -23,8 +23,8 @@ from tests.wallets.helpers import ( def encrypt_content(priv_key, dest_pub_key, content): - p = secp256k1.PublicKey(bytes.fromhex("02" + dest_pub_key), True) - shared = p.tweak_mul(bytes.fromhex(priv_key)).serialize()[1:] + p = PublicKey(bytes.fromhex("02" + dest_pub_key)) + shared = p.multiply(bytes.fromhex(priv_key)).format()[1:] iv = Random.new().read(AES.block_size) aes = AES.new(shared, AES.MODE_CBC, iv) @@ -38,8 +38,8 @@ def encrypt_content(priv_key, dest_pub_key, content): def decrypt_content(priv_key, source_pub_key, content): - p = secp256k1.PublicKey(bytes.fromhex("02" + source_pub_key), True) - shared = p.tweak_mul(bytes.fromhex(priv_key)).serialize()[1:] + p = PublicKey(bytes.fromhex("02" + source_pub_key)) + shared = p.multiply(bytes.fromhex(priv_key)).format()[1:] (encrypted_content_b64, iv_b64) = content.split("?iv=") encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii")) iv = base64.b64decode(iv_b64.encode("ascii")) @@ -69,8 +69,8 @@ def sign_event(pub_key, priv_key, event): event_id = hashlib.sha256(signature_data.encode()).hexdigest() event["id"] = event_id event["pubkey"] = pub_key - s = secp256k1.PrivateKey(bytes.fromhex(priv_key)) - signature = (s.schnorr_sign(bytes.fromhex(event_id), None, raw=True)).hex() + s = PrivateKey(bytes.fromhex(priv_key)) + signature = s.sign_schnorr(bytes.fromhex(event_id)).hex() event["sig"] = signature return event