[feat] Nostr Login (#2703)
---------
Co-authored-by: dni ⚡ <office@dnilabs.com>
This commit is contained in:
parent
f062b3d5e5
commit
0b8da2b524
31 changed files with 8281 additions and 315 deletions
858
tests/api/test_auth.py
Normal file
858
tests/api/test_auth.py
Normal file
|
|
@ -0,0 +1,858 @@
|
|||
import base64
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import jwt
|
||||
import pytest
|
||||
import secp256k1
|
||||
import shortuuid
|
||||
from httpx import AsyncClient
|
||||
|
||||
from lnbits.core.models import AccessTokenPayload, User
|
||||
from lnbits.settings import AuthMethods, settings
|
||||
from lnbits.utils.nostr import hex_to_npub, sign_event
|
||||
|
||||
nostr_event = {
|
||||
"kind": 27235,
|
||||
"tags": [["u", "http://localhost:5000/nostr"], ["method", "POST"]],
|
||||
"created_at": 1727681048,
|
||||
"content": "",
|
||||
"pubkey": "f6e80df16fa27f1f2774af0ac61b096f8f63ce9116f0a954fca1e25baee84ba9",
|
||||
"id": "0fd22355fe63043116fdfceb77be6bf22686aacd16b9e99a10fea6e55ae3f589",
|
||||
"sig": "fb7eb47fa8355747f6837e55620103d73ba47b2c3164ab8319d2f164022a9f25"
|
||||
"6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138",
|
||||
}
|
||||
private_key = secp256k1.PrivateKey(
|
||||
bytes.fromhex("6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138")
|
||||
)
|
||||
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
|
||||
|
||||
settings.auth_allowed_methods = AuthMethods.all()
|
||||
|
||||
|
||||
################################ LOGIN ################################
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_bad_user(http_client: AsyncClient):
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": "non_existing_user", "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "User does not exist"
|
||||
assert response.json().get("detail") == "Invalid credentials."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_alan_usr(user_alan: User, http_client: AsyncClient):
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None, "Expected access token after login."
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
alan = response.json()
|
||||
assert alan["id"] == user_alan.id
|
||||
assert alan["username"] == user_alan.username
|
||||
assert alan["email"] == user_alan.email
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_usr_not_allowed(user_alan: User, http_client: AsyncClient):
|
||||
# exclude 'user_id_only'
|
||||
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
|
||||
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 401, "Login method not allowed."
|
||||
assert response.json().get("detail") == "Login by 'User ID' not allowed."
|
||||
|
||||
settings.auth_allowed_methods = AuthMethods.all()
|
||||
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
assert response.status_code == 200, "Login with 'usr' allowed."
|
||||
assert (
|
||||
response.json().get("access_token") is not None
|
||||
), "Expected access token after login."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_alan_username_password_ok(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK"
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
|
||||
access_token_payload = AccessTokenPayload(**payload)
|
||||
assert access_token_payload.sub == "alan", "Subject is Alan."
|
||||
assert access_token_payload.email == "alan@lnbits.com"
|
||||
assert access_token_payload.auth_time, "Auth time should be set by server."
|
||||
assert (
|
||||
0 <= time.time() - access_token_payload.auth_time <= 5
|
||||
), "Auth time should be very close to now()."
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
assert response.status_code == 200, "User exits."
|
||||
user = User(**response.json())
|
||||
assert user.username == "alan", "Username check."
|
||||
assert user.email == "alan@lnbits.com", "Email check."
|
||||
assert not user.pubkey, "No pubkey."
|
||||
assert not user.admin, "Not admin."
|
||||
assert not user.super_user, "Not superuser."
|
||||
assert user.has_password, "Password configured."
|
||||
assert len(user.wallets) == 1, "One default wallet."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_alan_email_password_ok(user_alan: User, http_client: AsyncClient):
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.email, "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK"
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_alan_password_nok(user_alan: User, http_client: AsyncClient):
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "bad_pasword"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "User does not exist"
|
||||
assert response.json().get("detail") == "Invalid credentials."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_username_password_not_allowed(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
# exclude 'username_password'
|
||||
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "Login method not allowed."
|
||||
assert (
|
||||
response.json().get("detail") == "Login by 'Username and Password' not allowed."
|
||||
)
|
||||
|
||||
settings.auth_allowed_methods = AuthMethods.all()
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
|
||||
)
|
||||
assert response.status_code == 200, "Username and password is allowed."
|
||||
assert response.json().get("access_token") is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_alan_change_auth_secret_key(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK"
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
initial_auth_secret_key = settings.auth_secret_key
|
||||
|
||||
settings.auth_secret_key = shortuuid.uuid()
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
assert response.status_code == 401, "Access token not valid anymore."
|
||||
assert response.json().get("detail") == "Invalid access token."
|
||||
|
||||
settings.auth_secret_key = initial_auth_secret_key
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
assert response.status_code == 200, "Access token valid again."
|
||||
|
||||
|
||||
################################ REGISTER WITH PASSWORD ################################
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_ok(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
access_token = response.json().get("access_token")
|
||||
assert response.status_code == 200, "User created."
|
||||
assert response.json().get("access_token") is not None
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
assert response.status_code == 200, "User exits."
|
||||
user = User(**response.json())
|
||||
assert user.username == f"u21.{tiny_id}", "Username check."
|
||||
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
|
||||
assert not user.pubkey, "No pubkey check."
|
||||
assert not user.admin, "Not admin."
|
||||
assert not user.super_user, "Not superuser."
|
||||
assert user.has_password, "Password configured."
|
||||
assert len(user.wallets) == 1, "One default wallet."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_email_twice(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "User created."
|
||||
assert response.json().get("access_token") is not None
|
||||
|
||||
tiny_id_2 = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id_2}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403, "Not allowed."
|
||||
assert response.json().get("detail") == "Email already exists."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_username_twice(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "User created."
|
||||
assert response.json().get("access_token") is not None
|
||||
|
||||
tiny_id_2 = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id_2}@lnbits.com",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403, "Not allowed."
|
||||
assert response.json().get("detail") == "Username already exists."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_passwords_do_not_match(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret0000",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 400, "Bad passwords."
|
||||
assert response.json().get("detail") == "Passwords do not match."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_bad_email(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": "not_an_email_lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 400, "Bad email."
|
||||
assert response.json().get("detail") == "Invalid email."
|
||||
|
||||
|
||||
################################ CHANGE PASSWORD ################################
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_password_ok(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "User created."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
|
||||
access_token_payload = AccessTokenPayload(**payload)
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/password",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"user_id": access_token_payload.usr,
|
||||
"password_old": "secret1234",
|
||||
"password": "secret0000",
|
||||
"password_repeat": "secret0000",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Password changed."
|
||||
user = User(**response.json())
|
||||
assert user.username == f"u21.{tiny_id}", "Username check."
|
||||
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "Old password does not work"
|
||||
assert response.json().get("detail") == "Invalid credentials."
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret0000"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "New password works."
|
||||
assert response.json().get("access_token") is not None, "Access token created."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_password_not_authenticated(http_client: AsyncClient):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/password",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"user_id": "0000",
|
||||
"password_old": "secret1234",
|
||||
"password": "secret0000",
|
||||
"password_repeat": "secret0000",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "User not authenticated."
|
||||
assert response.json().get("detail") == "Missing user ID or access token."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alan_change_password_old_nok(user_alan: User, http_client: AsyncClient):
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/password",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"username": user_alan.username,
|
||||
"user_id": user_alan.id,
|
||||
"password_old": "secret0000",
|
||||
"password": "secret0001",
|
||||
"password_repeat": "secret0001",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 403, "Old password bad."
|
||||
assert response.json().get("detail") == "Invalid credentials."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alan_change_password_different_user(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/password",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"username": user_alan.username,
|
||||
"user_id": user_alan.id[::-1],
|
||||
"password_old": "secret1234",
|
||||
"password": "secret0001",
|
||||
"password_repeat": "secret0001",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 400, "Different user id."
|
||||
assert response.json().get("detail") == "Invalid user ID."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alan_change_password_auth_threshold_expired(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
initial_update_threshold = settings.auth_credetials_update_threshold
|
||||
settings.auth_credetials_update_threshold = 1
|
||||
time.sleep(1.1)
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/password",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"username": user_alan.username,
|
||||
"user_id": user_alan.id,
|
||||
"password_old": "secret1234",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
},
|
||||
)
|
||||
|
||||
settings.auth_credetials_update_threshold = initial_update_threshold
|
||||
|
||||
assert response.status_code == 403, "Treshold expired."
|
||||
assert (
|
||||
response.json().get("detail") == "You can only update your credentials"
|
||||
" in the first 1 seconds after login."
|
||||
" Please login again!"
|
||||
)
|
||||
|
||||
|
||||
################################ REGISTER PUBLIC KEY ################################
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_ok(http_client: AsyncClient):
|
||||
event = {**nostr_event}
|
||||
event["created_at"] = int(time.time())
|
||||
|
||||
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
|
||||
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
|
||||
event_signed = sign_event(event, pubkey_hex, private_key)
|
||||
base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 200, "User created."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
|
||||
access_token_payload = AccessTokenPayload(**payload)
|
||||
assert access_token_payload.auth_time, "Auth time should be set by server."
|
||||
assert (
|
||||
0 <= time.time() - access_token_payload.auth_time <= 5
|
||||
), "Auth time should be very close to now()."
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
user = User(**response.json())
|
||||
assert user.username is None, "No username."
|
||||
assert user.email is None, "No email."
|
||||
assert user.pubkey == pubkey_hex, "Pubkey check."
|
||||
assert not user.admin, "Not admin."
|
||||
assert not user.super_user, "Not superuser."
|
||||
assert not user.has_password, "Password configured."
|
||||
assert len(user.wallets) == 1, "One default wallet."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_not_allowed(http_client: AsyncClient):
|
||||
# exclude 'nostr_auth_nip98'
|
||||
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
json={},
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "User not authenticated."
|
||||
assert response.json().get("detail") == "Login with Nostr Auth not allowed."
|
||||
|
||||
settings.auth_allowed_methods = AuthMethods.all()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_bad_header(http_client: AsyncClient):
|
||||
response = await http_client.post("/api/v1/auth/nostr")
|
||||
|
||||
assert response.status_code == 401, "Missing header."
|
||||
assert response.json().get("detail") == "Nostr Auth header missing."
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": "Bearer xyz"},
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "Non nostr header."
|
||||
assert response.json().get("detail") == "Authorization header is not nostr."
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": "nostr xyz"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr not base64."
|
||||
assert response.json().get("detail") == "Nostr login event cannot be parsed."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_bad_event(http_client: AsyncClient):
|
||||
settings.auth_allowed_methods = AuthMethods.all()
|
||||
base64_event = base64.b64encode(json.dumps(nostr_event).encode()).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event expired."
|
||||
assert (
|
||||
response.json().get("detail")
|
||||
== f"More than {settings.auth_credetials_update_threshold}"
|
||||
" seconds have passed since the event was signed."
|
||||
)
|
||||
|
||||
corrupted_event = {**nostr_event}
|
||||
corrupted_event["content"] = "xyz"
|
||||
base64_event = base64.b64encode(json.dumps(corrupted_event).encode()).decode(
|
||||
"ascii"
|
||||
)
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event signature invalid."
|
||||
assert response.json().get("detail") == "Nostr login event is not valid."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_bad_event_kind(http_client: AsyncClient):
|
||||
event_bad_kind = {**nostr_event}
|
||||
event_bad_kind["kind"] = "12345"
|
||||
|
||||
event_bad_kind_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
|
||||
base64_event_bad_kind = base64.b64encode(
|
||||
json.dumps(event_bad_kind_signed).encode()
|
||||
).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event_bad_kind}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event kind invalid."
|
||||
assert response.json().get("detail") == "Invalid event kind."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_bad_event_tag_u(http_client: AsyncClient):
|
||||
event_bad_kind = {**nostr_event}
|
||||
event_bad_kind["created_at"] = int(time.time())
|
||||
|
||||
event_bad_kind["tags"] = [["u", "http://localhost:5000/nostr"]]
|
||||
|
||||
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
|
||||
base64_event_tag_kind = base64.b64encode(
|
||||
json.dumps(event_bad_tag_signed).encode()
|
||||
).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event_tag_kind}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event tag missing."
|
||||
assert response.json().get("detail") == "Tag 'method' is missing."
|
||||
|
||||
event_bad_kind["tags"] = [["u", "http://localhost:5000/nostr"], ["method", "XYZ"]]
|
||||
|
||||
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
|
||||
base64_event_tag_kind = base64.b64encode(
|
||||
json.dumps(event_bad_tag_signed).encode()
|
||||
).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event_tag_kind}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event tag invalid."
|
||||
assert response.json().get("detail") == "Incorrect value for tag 'method'."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_nostr_bad_event_tag_menthod(http_client: AsyncClient):
|
||||
event_bad_kind = {**nostr_event}
|
||||
event_bad_kind["created_at"] = int(time.time())
|
||||
|
||||
event_bad_kind["tags"] = [["method", "POST"]]
|
||||
|
||||
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
|
||||
base64_event = base64.b64encode(json.dumps(event_bad_tag_signed).encode()).decode(
|
||||
"ascii"
|
||||
)
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event tag missing."
|
||||
assert response.json().get("detail") == "Tag 'u' for URL is missing."
|
||||
|
||||
event_bad_kind["tags"] = [["u", "http://demo.lnbits.com/nostr"], ["method", "POST"]]
|
||||
|
||||
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
|
||||
base64_event = base64.b64encode(json.dumps(event_bad_tag_signed).encode()).decode(
|
||||
"ascii"
|
||||
)
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 401, "Nostr event tag invalid."
|
||||
assert (
|
||||
response.json().get("detail") == "Incorrect value for tag 'u':"
|
||||
" 'http://demo.lnbits.com/nostr'."
|
||||
)
|
||||
|
||||
|
||||
################################ CHANGE PUBLIC KEY ################################
|
||||
async def test_change_pubkey_npub_ok(http_client: AsyncClient, user_alan: User):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "User created."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
|
||||
access_token_payload = AccessTokenPayload(**payload)
|
||||
|
||||
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
|
||||
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
|
||||
npub = hex_to_npub(pubkey_hex)
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"user_id": access_token_payload.usr,
|
||||
"pubkey": npub,
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Pubkey changed."
|
||||
user = User(**response.json())
|
||||
assert user.username == f"u21.{tiny_id}", "Username check."
|
||||
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
|
||||
assert user.pubkey == pubkey_hex
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_pubkey_ok(http_client: AsyncClient, user_alan: User):
|
||||
tiny_id = shortuuid.uuid()[:8]
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={
|
||||
"username": f"u21.{tiny_id}",
|
||||
"password": "secret1234",
|
||||
"password_repeat": "secret1234",
|
||||
"email": f"u21.{tiny_id}@lnbits.com",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "User created."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
|
||||
access_token_payload = AccessTokenPayload(**payload)
|
||||
|
||||
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
|
||||
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"user_id": access_token_payload.usr,
|
||||
"pubkey": pubkey_hex,
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Pubkey changed."
|
||||
user = User(**response.json())
|
||||
assert user.username == f"u21.{tiny_id}", "Username check."
|
||||
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
|
||||
assert user.pubkey == pubkey_hex
|
||||
|
||||
# Login with nostr
|
||||
event = {**nostr_event}
|
||||
event["created_at"] = int(time.time())
|
||||
event["pubkey"] = pubkey_hex
|
||||
event_signed = sign_event(event, pubkey_hex, private_key)
|
||||
base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii")
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth/nostr",
|
||||
headers={"Authorization": f"nostr {base64_event}"},
|
||||
)
|
||||
assert response.status_code == 200, "User logged in."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
response = await http_client.get(
|
||||
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
|
||||
)
|
||||
user = User(**response.json())
|
||||
assert user.username == f"u21.{tiny_id}", "Username check."
|
||||
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
|
||||
assert user.pubkey == pubkey_hex, "No pubkey."
|
||||
assert not user.admin, "Not admin."
|
||||
assert not user.super_user, "Not superuser."
|
||||
assert user.has_password, "Password configured."
|
||||
assert len(user.wallets) == 1, "One default wallet."
|
||||
|
||||
response = await http_client.post(
|
||||
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK"
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"user_id": user_alan.id,
|
||||
"pubkey": pubkey_hex,
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 403, "Pubkey already used."
|
||||
assert response.json().get("detail") == "Public key already in use."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_pubkey_not_authenticated(
|
||||
http_client: AsyncClient, user_alan: User
|
||||
):
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
json={
|
||||
"user_id": user_alan.id,
|
||||
"pubkey": pubkey_hex,
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 401, "Must be authenticated to change pubkey."
|
||||
assert response.json().get("detail") == "Missing user ID or access token."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_pubkey_other_user(http_client: AsyncClient, user_alan: User):
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"user_id": user_alan.id[::-1],
|
||||
"pubkey": pubkey_hex,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 400, "Not your user."
|
||||
assert response.json().get("detail") == "Invalid user ID."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alan_change_pubkey_auth_threshold_expired(
|
||||
user_alan: User, http_client: AsyncClient
|
||||
):
|
||||
|
||||
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
|
||||
|
||||
assert response.status_code == 200, "Alan logs in OK."
|
||||
access_token = response.json().get("access_token")
|
||||
assert access_token is not None
|
||||
|
||||
initial_update_threshold = settings.auth_credetials_update_threshold
|
||||
settings.auth_credetials_update_threshold = 1
|
||||
time.sleep(1.1)
|
||||
response = await http_client.put(
|
||||
"/api/v1/auth/pubkey",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
json={
|
||||
"user_id": user_alan.id,
|
||||
"pubkey": pubkey_hex,
|
||||
},
|
||||
)
|
||||
|
||||
settings.auth_credetials_update_threshold = initial_update_threshold
|
||||
|
||||
assert response.status_code == 403, "Treshold expired."
|
||||
assert (
|
||||
response.json().get("detail") == "You can only update your credentials"
|
||||
" in the first 1 seconds after login."
|
||||
" Please login again!"
|
||||
)
|
||||
|
|
@ -16,11 +16,12 @@ from lnbits.app import create_app
|
|||
from lnbits.core.crud import (
|
||||
create_account,
|
||||
create_wallet,
|
||||
get_account_by_username,
|
||||
get_user,
|
||||
update_payment_status,
|
||||
)
|
||||
from lnbits.core.models import CreateInvoice, PaymentState
|
||||
from lnbits.core.services import update_wallet_balance
|
||||
from lnbits.core.services import create_user_account, update_wallet_balance
|
||||
from lnbits.core.views.payment_api import api_payments_create_invoice
|
||||
from lnbits.db import DB_TYPE, SQLITE, Database
|
||||
from lnbits.settings import settings
|
||||
|
|
@ -59,6 +60,13 @@ async def client(app):
|
|||
yield client
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def http_client(app):
|
||||
url = f"http://{settings.host}:{settings.port}"
|
||||
async with AsyncClient(app=app, base_url=url) as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def test_client(app):
|
||||
return TestClient(app)
|
||||
|
|
@ -69,6 +77,16 @@ async def db():
|
|||
yield Database("database")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="package")
|
||||
async def user_alan():
|
||||
user = await get_account_by_username("alan")
|
||||
if not user:
|
||||
user = await create_user_account(
|
||||
email="alan@lnbits.com", username="alan", password="secret1234"
|
||||
)
|
||||
yield user
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def from_user():
|
||||
user = await create_account()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue