diff --git a/nostr/bech32.py b/nostr/bech32.py new file mode 100644 index 0000000..b068de7 --- /dev/null +++ b/nostr/bech32.py @@ -0,0 +1,137 @@ +# Copyright (c) 2017, 2020 Pieter Wuille +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Reference implementation for Bech32/Bech32m and segwit addresses.""" + + +from enum import Enum + +class Encoding(Enum): + """Enumeration type to list the various supported encodings.""" + BECH32 = 1 + BECH32M = 2 + +CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" +BECH32M_CONST = 0x2bc830a3 + +def bech32_polymod(values): + """Internal function that computes the Bech32 checksum.""" + generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + chk = 1 + for value in values: + top = chk >> 25 + chk = (chk & 0x1ffffff) << 5 ^ value + for i in range(5): + chk ^= generator[i] if ((top >> i) & 1) else 0 + return chk + + +def bech32_hrp_expand(hrp): + """Expand the HRP into values for checksum computation.""" + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + + +def bech32_verify_checksum(hrp, data): + """Verify a checksum given HRP and converted data characters.""" + const = bech32_polymod(bech32_hrp_expand(hrp) + data) + if const == 1: + return Encoding.BECH32 + if const == BECH32M_CONST: + return Encoding.BECH32M + return None + +def bech32_create_checksum(hrp, data, spec): + """Compute the checksum values given HRP and data.""" + values = bech32_hrp_expand(hrp) + data + const = BECH32M_CONST if spec == Encoding.BECH32M else 1 + polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + + +def bech32_encode(hrp, data, spec): + """Compute a Bech32 string given HRP and data values.""" + combined = data + bech32_create_checksum(hrp, data, spec) + return hrp + '1' + ''.join([CHARSET[d] for d in combined]) + +def bech32_decode(bech): + """Validate a Bech32/Bech32m string, and determine HRP and data.""" + if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or + (bech.lower() != bech and bech.upper() != bech)): + return (None, None, None) + bech = bech.lower() + pos = bech.rfind('1') + if pos < 1 or pos + 7 > len(bech) or len(bech) > 90: + return (None, None, None) + if not all(x in CHARSET for x in bech[pos+1:]): + return (None, None, None) + hrp = bech[:pos] + data = [CHARSET.find(x) for x in bech[pos+1:]] + spec = bech32_verify_checksum(hrp, data) + if spec is None: + return (None, None, None) + return (hrp, data[:-6], spec) + +def convertbits(data, frombits, tobits, pad=True): + """General power-of-2 base conversion.""" + acc = 0 + bits = 0 + ret = [] + maxv = (1 << tobits) - 1 + max_acc = (1 << (frombits + tobits - 1)) - 1 + for value in data: + if value < 0 or (value >> frombits): + return None + acc = ((acc << frombits) | value) & max_acc + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad: + if bits: + ret.append((acc << (tobits - bits)) & maxv) + elif bits >= frombits or ((acc << (tobits - bits)) & maxv): + return None + return ret + + +def decode(hrp, addr): + """Decode a segwit address.""" + hrpgot, data, spec = bech32_decode(addr) + if hrpgot != hrp: + return (None, None) + decoded = convertbits(data[1:], 5, 8, False) + if decoded is None or len(decoded) < 2 or len(decoded) > 40: + return (None, None) + if data[0] > 16: + return (None, None) + if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32: + return (None, None) + if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M: + return (None, None) + return (data[0], decoded) + + +def encode(hrp, witver, witprog): + """Encode a segwit address.""" + spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M + ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec) + if decode(hrp, ret) == (None, None): + return None + return ret diff --git a/nostr/event.py b/nostr/event.py new file mode 100644 index 0000000..b903e0e --- /dev/null +++ b/nostr/event.py @@ -0,0 +1,126 @@ +import time +import json +from dataclasses import dataclass, field +from enum import IntEnum +from typing import List +from secp256k1 import PublicKey +from hashlib import sha256 + +from .message_type import ClientMessageType + + +class EventKind(IntEnum): + SET_METADATA = 0 + TEXT_NOTE = 1 + RECOMMEND_RELAY = 2 + CONTACTS = 3 + ENCRYPTED_DIRECT_MESSAGE = 4 + DELETE = 5 + + +@dataclass +class Event: + content: str = None + public_key: str = None + created_at: int = None + kind: int = EventKind.TEXT_NOTE + tags: List[List[str]] = field( + default_factory=list + ) # Dataclasses require special handling when the default value is a mutable type + signature: str = None + + def __post_init__(self): + if self.content is not None and not isinstance(self.content, str): + # DMs initialize content to None but all other kinds should pass in a str + raise TypeError("Argument 'content' must be of type str") + + if self.created_at is None: + self.created_at = int(time.time()) + + @staticmethod + def serialize( + public_key: str, created_at: int, kind: int, tags: List[List[str]], content: str + ) -> bytes: + data = [0, public_key, created_at, kind, tags, content] + data_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False) + return data_str.encode() + + @staticmethod + def compute_id( + public_key: str, created_at: int, kind: int, tags: List[List[str]], content: str + ): + return sha256( + Event.serialize(public_key, created_at, kind, tags, content) + ).hexdigest() + + @property + def id(self) -> str: + # Always recompute the id to reflect the up-to-date state of the Event + return Event.compute_id( + self.public_key, self.created_at, self.kind, self.tags, self.content + ) + + def add_pubkey_ref(self, pubkey: str): + """Adds a reference to a pubkey as a 'p' tag""" + self.tags.append(["p", pubkey]) + + def add_event_ref(self, event_id: str): + """Adds a reference to an event_id as an 'e' tag""" + self.tags.append(["e", event_id]) + + def verify(self) -> bool: + pub_key = PublicKey( + bytes.fromhex("02" + self.public_key), True + ) # add 02 for schnorr (bip340) + return pub_key.schnorr_verify( + bytes.fromhex(self.id), bytes.fromhex(self.signature), None, raw=True + ) + + def to_message(self) -> str: + return json.dumps( + [ + ClientMessageType.EVENT, + { + "id": self.id, + "pubkey": self.public_key, + "created_at": self.created_at, + "kind": self.kind, + "tags": self.tags, + "content": self.content, + "sig": self.signature, + }, + ] + ) + + +@dataclass +class EncryptedDirectMessage(Event): + recipient_pubkey: str = None + cleartext_content: str = None + reference_event_id: str = None + + def __post_init__(self): + if self.content is not None: + self.cleartext_content = self.content + self.content = None + + if self.recipient_pubkey is None: + raise Exception("Must specify a recipient_pubkey.") + + self.kind = EventKind.ENCRYPTED_DIRECT_MESSAGE + super().__post_init__() + + # Must specify the DM recipient's pubkey in a 'p' tag + self.add_pubkey_ref(self.recipient_pubkey) + + # Optionally specify a reference event (DM) this is a reply to + if self.reference_event_id is not None: + self.add_event_ref(self.reference_event_id) + + @property + def id(self) -> str: + if self.content is None: + raise Exception( + "EncryptedDirectMessage `id` is undefined until its message is encrypted and stored in the `content` field" + ) + return super().id diff --git a/nostr/key.py b/nostr/key.py new file mode 100644 index 0000000..4adb0b9 --- /dev/null +++ b/nostr/key.py @@ -0,0 +1,147 @@ +import secrets +import base64 +import secp256k1 +from cffi import FFI +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives import padding +from hashlib import sha256 + +from .event import EncryptedDirectMessage, Event, EventKind +from . import bech32 + + +class PublicKey: + def __init__(self, raw_bytes: bytes) -> None: + self.raw_bytes = raw_bytes + + def bech32(self) -> str: + converted_bits = bech32.convertbits(self.raw_bytes, 8, 5) + return bech32.bech32_encode("npub", converted_bits, bech32.Encoding.BECH32) + + def hex(self) -> str: + return self.raw_bytes.hex() + + def verify_signed_message_hash(self, hash: str, sig: str) -> bool: + pk = secp256k1.PublicKey(b"\x02" + self.raw_bytes, True) + return pk.schnorr_verify(bytes.fromhex(hash), bytes.fromhex(sig), None, True) + + @classmethod + def from_npub(cls, npub: str): + """Load a PublicKey from its bech32/npub form""" + hrp, data, spec = bech32.bech32_decode(npub) + raw_public_key = bech32.convertbits(data, 5, 8)[:-1] + return cls(bytes(raw_public_key)) + + +class PrivateKey: + def __init__(self, raw_secret: bytes = None) -> None: + if not raw_secret is None: + self.raw_secret = raw_secret + else: + self.raw_secret = secrets.token_bytes(32) + + sk = secp256k1.PrivateKey(self.raw_secret) + self.public_key = PublicKey(sk.pubkey.serialize()[1:]) + + @classmethod + def from_nsec(cls, nsec: str): + """Load a PrivateKey from its bech32/nsec form""" + hrp, data, spec = bech32.bech32_decode(nsec) + raw_secret = bech32.convertbits(data, 5, 8)[:-1] + return cls(bytes(raw_secret)) + + def bech32(self) -> str: + converted_bits = bech32.convertbits(self.raw_secret, 8, 5) + return bech32.bech32_encode("nsec", converted_bits, bech32.Encoding.BECH32) + + def hex(self) -> str: + return self.raw_secret.hex() + + def tweak_add(self, scalar: bytes) -> bytes: + sk = secp256k1.PrivateKey(self.raw_secret) + return sk.tweak_add(scalar) + + def compute_shared_secret(self, public_key_hex: str) -> bytes: + pk = secp256k1.PublicKey(bytes.fromhex("02" + public_key_hex), True) + return pk.ecdh(self.raw_secret, hashfn=copy_x) + + def encrypt_message(self, message: str, public_key_hex: str) -> str: + padder = padding.PKCS7(128).padder() + padded_data = padder.update(message.encode()) + padder.finalize() + + iv = secrets.token_bytes(16) + cipher = Cipher( + algorithms.AES(self.compute_shared_secret(public_key_hex)), modes.CBC(iv) + ) + + encryptor = cipher.encryptor() + encrypted_message = encryptor.update(padded_data) + encryptor.finalize() + + return f"{base64.b64encode(encrypted_message).decode()}?iv={base64.b64encode(iv).decode()}" + + def encrypt_dm(self, dm: EncryptedDirectMessage) -> None: + dm.content = self.encrypt_message( + message=dm.cleartext_content, public_key_hex=dm.recipient_pubkey + ) + + def decrypt_message(self, encoded_message: str, public_key_hex: str) -> str: + encoded_data = encoded_message.split("?iv=") + encoded_content, encoded_iv = encoded_data[0], encoded_data[1] + + iv = base64.b64decode(encoded_iv) + cipher = Cipher( + algorithms.AES(self.compute_shared_secret(public_key_hex)), modes.CBC(iv) + ) + encrypted_content = base64.b64decode(encoded_content) + + decryptor = cipher.decryptor() + decrypted_message = decryptor.update(encrypted_content) + decryptor.finalize() + + unpadder = padding.PKCS7(128).unpadder() + unpadded_data = unpadder.update(decrypted_message) + unpadder.finalize() + + return unpadded_data.decode() + + def sign_message_hash(self, hash: bytes) -> str: + sk = secp256k1.PrivateKey(self.raw_secret) + sig = sk.schnorr_sign(hash, None, raw=True) + return sig.hex() + + def sign_event(self, event: Event) -> None: + if event.kind == EventKind.ENCRYPTED_DIRECT_MESSAGE and event.content is None: + self.encrypt_dm(event) + if event.public_key is None: + event.public_key = self.public_key.hex() + event.signature = self.sign_message_hash(bytes.fromhex(event.id)) + + def __eq__(self, other): + return self.raw_secret == other.raw_secret + + +def mine_vanity_key(prefix: str = None, suffix: str = None) -> PrivateKey: + if prefix is None and suffix is None: + raise ValueError("Expected at least one of 'prefix' or 'suffix' arguments") + + while True: + sk = PrivateKey() + if ( + prefix is not None + and not sk.public_key.bech32()[5 : 5 + len(prefix)] == prefix + ): + continue + if suffix is not None and not sk.public_key.bech32()[-len(suffix) :] == suffix: + continue + break + + return sk + + +ffi = FFI() + + +@ffi.callback( + "int (unsigned char *, const unsigned char *, const unsigned char *, void *)" +) +def copy_x(output, x32, y32, data): + ffi.memmove(output, x32, 32) + return 1 diff --git a/nostr/message_type.py b/nostr/message_type.py new file mode 100644 index 0000000..3f5206b --- /dev/null +++ b/nostr/message_type.py @@ -0,0 +1,15 @@ +class ClientMessageType: + EVENT = "EVENT" + REQUEST = "REQ" + CLOSE = "CLOSE" + +class RelayMessageType: + EVENT = "EVENT" + NOTICE = "NOTICE" + END_OF_STORED_EVENTS = "EOSE" + + @staticmethod + def is_valid(type: str) -> bool: + if type == RelayMessageType.EVENT or type == RelayMessageType.NOTICE or type == RelayMessageType.END_OF_STORED_EVENTS: + return True + return False \ No newline at end of file