feat: NWC use coincurve instead of secp (#3455)

This commit is contained in:
dni ⚡ 2025-10-30 08:06:31 +01:00 committed by GitHub
parent 2b603bdc48
commit 39ca9da870
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 41 additions and 45 deletions

View file

@ -4,7 +4,7 @@ import json
import re import re
from urllib.parse import urlparse from urllib.parse import urlparse
import secp256k1 import coincurve
from bech32 import bech32_decode, bech32_encode, convertbits from bech32 import bech32_decode, bech32_encode, convertbits
from Cryptodome import Random from Cryptodome import Random
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
@ -19,22 +19,22 @@ def generate_keypair() -> tuple[str, str]:
def encrypt_content( def encrypt_content(
content: str, service_pubkey: secp256k1.PublicKey, account_private_key_hex: str content: str, service_pubkey: coincurve.PublicKey, account_private_key_hex: str
) -> str: ) -> str:
""" """
Encrypts the content to be sent to the service. Encrypts the content to be sent to the service.
Args: Args:
content (str): The content to be encrypted. content (str): The content to be encrypted.
service_pubkey (secp256k1.PublicKey): The service provider's public key. service_pubkey (coincurve.PublicKey): The service provider's public key.
account_private_key_hex (str): The account private key in hex format. account_private_key_hex (str): The account private key in hex format.
Returns: Returns:
str: The encrypted content. str: The encrypted content.
""" """
shared = service_pubkey.tweak_mul( shared = service_pubkey.multiply(bytes.fromhex(account_private_key_hex)).format()[
bytes.fromhex(account_private_key_hex) 1:
).serialize()[1:] ]
# random iv (16B) # random iv (16B)
iv = Random.new().read(AES.block_size) iv = Random.new().read(AES.block_size)
aes = AES.new(shared, AES.MODE_CBC, iv) aes = AES.new(shared, AES.MODE_CBC, iv)
@ -52,22 +52,22 @@ def encrypt_content(
def decrypt_content( def decrypt_content(
content: str, service_pubkey: secp256k1.PublicKey, account_private_key_hex: str content: str, service_pubkey: coincurve.PublicKey, account_private_key_hex: str
) -> str: ) -> str:
""" """
Decrypts the content coming from the service. Decrypts the content coming from the service.
Args: Args:
content (str): The encrypted content. content (str): The encrypted content.
service_pubkey (secp256k1.PublicKey): The service provider's public key. service_pubkey (coincurve.PublicKey): The service provider's public key.
account_private_key_hex (str): The account private key in hex format. account_private_key_hex (str): The account private key in hex format.
Returns: Returns:
str: The decrypted content. str: The decrypted content.
""" """
shared = service_pubkey.tweak_mul( shared = service_pubkey.multiply(bytes.fromhex(account_private_key_hex)).format()[
bytes.fromhex(account_private_key_hex) 1:
).serialize()[1:] ]
# extract iv and content # extract iv and content
(encrypted_content_b64, iv_b64) = content.split("?iv=") (encrypted_content_b64, iv_b64) = content.split("?iv=")
encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii")) encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii"))
@ -105,16 +105,14 @@ def verify_event(event: dict) -> bool:
if event_id != event["id"]: if event_id != event["id"]:
return False return False
pubkey_hex = event["pubkey"] pubkey_hex = event["pubkey"]
pubkey = secp256k1.PublicKey(bytes.fromhex("02" + pubkey_hex), True) pubkey = coincurve.PublicKeyXOnly(bytes.fromhex(pubkey_hex))
if not pubkey.schnorr_verify( if not pubkey.verify(bytes.fromhex(event["sig"]), bytes.fromhex(event_id)):
bytes.fromhex(event_id), bytes.fromhex(event["sig"]), None, raw=True
):
return False return False
return True return True
def sign_event( def sign_event(
event: dict, account_public_key_hex: str, account_private_key: secp256k1.PrivateKey event: dict, account_public_key_hex: str, account_private_key: coincurve.PrivateKey
) -> dict: ) -> dict:
""" """
Signs the event (in place) with the service secret Signs the event (in place) with the service secret
@ -122,7 +120,7 @@ def sign_event(
Args: Args:
event (Dict): The event to be signed. event (Dict): The event to be signed.
account_public_key_hex (str): The account public key in hex format. account_public_key_hex (str): The account public key in hex format.
account_private_key (secp256k1.PrivateKey): The account private key. account_private_key (coincurve.PrivateKey): The account private key.
Returns: Returns:
Dict: The input event with the signature added. Dict: The input event with the signature added.
@ -141,9 +139,7 @@ def sign_event(
event["id"] = event_id event["id"] = event_id
event["pubkey"] = account_public_key_hex event["pubkey"] = account_public_key_hex
signature = ( signature = account_private_key.sign_schnorr(bytes.fromhex(event_id)).hex()
account_private_key.schnorr_sign(bytes.fromhex(event_id), None, raw=True)
).hex()
event["sig"] = signature event["sig"] = signature
return event return event

View file

@ -7,8 +7,8 @@ from collections.abc import AsyncGenerator
from typing import cast from typing import cast
from urllib.parse import parse_qs, unquote, urlparse from urllib.parse import parse_qs, unquote, urlparse
import secp256k1
from bolt11 import decode as bolt11_decode from bolt11 import decode as bolt11_decode
from coincurve import PrivateKey, PublicKey
from loguru import logger from loguru import logger
from websockets import connect as ws_connect from websockets import connect as ws_connect
@ -306,15 +306,15 @@ class NWCConnection:
# Parse pairing url (if invalid an exception is raised) # Parse pairing url (if invalid an exception is raised)
# Extract keys (used to sign nwc events+identify NWC user) # Extract keys (used to sign nwc events+identify NWC user)
self.account_private_key = secp256k1.PrivateKey(bytes.fromhex(secret)) self.account_private_key = PrivateKey(bytes.fromhex(secret))
self.account_private_key_hex = secret self.account_private_key_hex = secret
self.account_public_key = self.account_private_key.pubkey self.account_public_key = self.account_private_key.public_key
if not self.account_public_key: if not self.account_public_key:
raise ValueError("Missing account public key") raise ValueError("Missing account public key")
self.account_public_key_hex = self.account_public_key.serialize().hex()[2:] self.account_public_key_hex = self.account_public_key.format().hex()[2:]
# Extract service key (used for encryption to identify the nwc service provider) # Extract service key (used for encryption to identify the nwc service provider)
self.service_pubkey = secp256k1.PublicKey(bytes.fromhex("02" + pubkey), True) self.service_pubkey = PublicKey(bytes.fromhex("02" + pubkey))
self.service_pubkey_hex = pubkey self.service_pubkey_hex = pubkey
# Extract relay url # Extract relay url

View file

@ -6,8 +6,8 @@ from uuid import uuid4
import jwt import jwt
import pytest import pytest
import secp256k1
import shortuuid import shortuuid
from coincurve import PrivateKey
from httpx import AsyncClient from httpx import AsyncClient
from lnbits.core.crud.users import ( from lnbits.core.crud.users import (
@ -42,11 +42,11 @@ nostr_event = {
"sig": "fb7eb47fa8355747f6837e55620103d73ba47b2c3164ab8319d2f164022a9f25" "sig": "fb7eb47fa8355747f6837e55620103d73ba47b2c3164ab8319d2f164022a9f25"
"6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138", "6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138",
} }
private_key = secp256k1.PrivateKey( private_key = PrivateKey(
bytes.fromhex("6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138") bytes.fromhex("6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138")
) )
assert private_key.pubkey, "Pubkey not created." assert private_key.public_key, "Pubkey not created."
pubkey_hex = private_key.pubkey.serialize().hex()[2:] pubkey_hex = private_key.public_key.format().hex()[2:]
################################ LOGIN ################################ ################################ LOGIN ################################
@ -552,9 +552,9 @@ async def test_register_nostr_ok(http_client: AsyncClient, settings: Settings):
event = {**nostr_event} event = {**nostr_event}
event["created_at"] = int(time.time()) event["created_at"] = int(time.time())
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex()))
assert private_key.pubkey, "Pubkey not created." assert private_key.public_key, "Pubkey not created."
pubkey_hex = private_key.pubkey.serialize().hex()[2:] pubkey_hex = private_key.public_key.format().hex()[2:]
event_signed = sign_event(event, pubkey_hex, private_key) event_signed = sign_event(event, pubkey_hex, private_key)
base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii") base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii")
response = await http_client.post( response = await http_client.post(
@ -759,9 +759,9 @@ async def test_change_pubkey_npub_ok(http_client: AsyncClient, settings: Setting
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"]) payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload) access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex()))
assert private_key.pubkey, "Pubkey not created." assert private_key.public_key, "Pubkey not created."
pubkey_hex = private_key.pubkey.serialize().hex()[2:] pubkey_hex = private_key.public_key.format().hex()[2:]
npub = hex_to_npub(pubkey_hex) npub = hex_to_npub(pubkey_hex)
response = await http_client.put( response = await http_client.put(
@ -802,9 +802,9 @@ async def test_change_pubkey_ok(
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"]) payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload) access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex())) private_key = PrivateKey(bytes.fromhex(os.urandom(32).hex()))
assert private_key.pubkey, "Pubkey not created." assert private_key.public_key, "Pubkey not created."
pubkey_hex = private_key.pubkey.serialize().hex()[2:] pubkey_hex = private_key.public_key.format().hex()[2:]
response = await http_client.put( response = await http_client.put(
"/api/v1/auth/pubkey", "/api/v1/auth/pubkey",

View file

@ -5,7 +5,7 @@ import time
from typing import cast from typing import cast
import pytest import pytest
import secp256k1 from coincurve import PrivateKey, PublicKey
from Cryptodome import Random from Cryptodome import Random
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad from Cryptodome.Util.Padding import pad, unpad
@ -23,8 +23,8 @@ from tests.wallets.helpers import (
def encrypt_content(priv_key, dest_pub_key, content): def encrypt_content(priv_key, dest_pub_key, content):
p = secp256k1.PublicKey(bytes.fromhex("02" + dest_pub_key), True) p = PublicKey(bytes.fromhex("02" + dest_pub_key))
shared = p.tweak_mul(bytes.fromhex(priv_key)).serialize()[1:] shared = p.multiply(bytes.fromhex(priv_key)).format()[1:]
iv = Random.new().read(AES.block_size) iv = Random.new().read(AES.block_size)
aes = AES.new(shared, AES.MODE_CBC, iv) aes = AES.new(shared, AES.MODE_CBC, iv)
@ -38,8 +38,8 @@ def encrypt_content(priv_key, dest_pub_key, content):
def decrypt_content(priv_key, source_pub_key, content): def decrypt_content(priv_key, source_pub_key, content):
p = secp256k1.PublicKey(bytes.fromhex("02" + source_pub_key), True) p = PublicKey(bytes.fromhex("02" + source_pub_key))
shared = p.tweak_mul(bytes.fromhex(priv_key)).serialize()[1:] shared = p.multiply(bytes.fromhex(priv_key)).format()[1:]
(encrypted_content_b64, iv_b64) = content.split("?iv=") (encrypted_content_b64, iv_b64) = content.split("?iv=")
encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii")) encrypted_content = base64.b64decode(encrypted_content_b64.encode("ascii"))
iv = base64.b64decode(iv_b64.encode("ascii")) iv = base64.b64decode(iv_b64.encode("ascii"))
@ -69,8 +69,8 @@ def sign_event(pub_key, priv_key, event):
event_id = hashlib.sha256(signature_data.encode()).hexdigest() event_id = hashlib.sha256(signature_data.encode()).hexdigest()
event["id"] = event_id event["id"] = event_id
event["pubkey"] = pub_key event["pubkey"] = pub_key
s = secp256k1.PrivateKey(bytes.fromhex(priv_key)) s = PrivateKey(bytes.fromhex(priv_key))
signature = (s.schnorr_sign(bytes.fromhex(event_id), None, raw=True)).hex() signature = s.sign_schnorr(bytes.fromhex(event_id)).hex()
event["sig"] = signature event["sig"] = signature
return event return event