[feat] access control lists (with access tokens) (#2864)

This commit is contained in:
Vlad Stan 2025-01-16 17:25:27 +02:00 committed by GitHub
parent f415a92914
commit b164317121
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2131 additions and 67 deletions

View file

@ -2,6 +2,7 @@ import base64
import json
import os
import time
from uuid import uuid4
import jwt
import pytest
@ -9,8 +10,22 @@ import secp256k1
import shortuuid
from httpx import AsyncClient
from lnbits.core.crud.users import (
get_user_access_control_lists,
update_user_access_control_list,
)
from lnbits.core.models import AccessTokenPayload, User
from lnbits.core.models.misc import SimpleItem
from lnbits.core.models.users import (
AccessControlList,
ApiTokenRequest,
DeleteTokenRequest,
EndpointAccess,
UpdateAccessControlList,
UserAcls,
)
from lnbits.core.views.user_api import api_users_reset_password
from lnbits.helpers import create_access_token
from lnbits.settings import AuthMethods, Settings
from lnbits.utils.nostr import hex_to_npub, sign_event
@ -1030,3 +1045,907 @@ async def test_reset_password_auth_threshold_expired(
" in the first 1 seconds."
" Please login again or ask a new reset key!"
)
################################ ACL ################################
@pytest.mark.anyio
async def test_api_update_user_acl_success(http_client: AsyncClient, user_alan: User):
# Login to get access token
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL
data = UpdateAccessControlList(
id="", name="New ACL", password="secret1234", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
json=data.dict(),
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == 200, "ACL should be created successfully."
user_acls = UserAcls(**response.json())
assert any(
acl.name == "New ACL" for acl in user_acls.access_control_list
), "ACL should be in the list."
@pytest.mark.anyio
async def test_api_update_user_acl_invalid_password(
http_client: AsyncClient, user_alan: User
):
# Login to get access token
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
# Attempt to create a new ACL with an invalid password
data = UpdateAccessControlList(
id="", name="New ACL", password="wrong_password", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
json=data.dict(),
headers={"Authorization": f"Bearer {access_token}"},
)
assert (
response.status_code == 401
), "Invalid password should result in unauthorized error."
assert response.json().get("detail") == "Invalid credentials."
@pytest.mark.anyio
async def test_api_update_user_acl_update_existing(
http_client: AsyncClient, user_alan: User
):
# Login to get access token
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL
data = UpdateAccessControlList(
id="", name="New ACL", password="secret1234", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
json=data.dict(),
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == 200, "ACL should be created successfully."
user_acls = UserAcls(**response.json())
acl = next(acl for acl in user_acls.access_control_list if acl.name == "New ACL")
# Update the existing ACL
data = UpdateAccessControlList(
id=acl.id, name="Updated ACL", password="secret1234", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
json=data.dict(),
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == 200, "ACL should be updated successfully."
user_acls = UserAcls(**response.json())
assert any(
acl.name == "Updated ACL" for acl in user_acls.access_control_list
), "ACL should be updated in the list."
@pytest.mark.anyio
async def test_api_update_user_acl_missing_password(
http_client: AsyncClient, user_alan: User
):
# Login to get access token
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
# Attempt to create a new ACL with a missing password
data = UpdateAccessControlList(id="", name="New ACL", password="", endpoints=[])
response = await http_client.put(
"/api/v1/auth/acl",
json=data.dict(),
headers={"Authorization": f"Bearer {access_token}"},
)
assert (
response.status_code == 401
), "Missing password should result in unauthorized error."
assert response.json().get("detail") == "Invalid credentials."
@pytest.mark.anyio
async def test_api_get_user_acls_success(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Get user ACLs
response = await http_client.get(
"/api/v1/auth/acl", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "ACLs fetched successfully."
user_acls = UserAcls(**response.json())
assert user_acls.id is not None, "User ID should be set."
assert isinstance(user_acls.access_control_list, list), "ACL should be a list."
@pytest.mark.anyio
async def test_api_get_user_acls_no_auth(http_client: AsyncClient):
# Attempt to get user ACLs without authentication
response = await http_client.get("/api/v1/auth/acl")
assert response.status_code == 401, "Unauthorized access."
@pytest.mark.anyio
async def test_api_get_user_acls_invalid_token(http_client: AsyncClient):
# Attempt to get user ACLs with an invalid token
response = await http_client.get(
"/api/v1/auth/acl", headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401, "Unauthorized access."
@pytest.mark.anyio
async def test_api_get_user_acls_empty_acl(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Get user ACLs
response = await http_client.get(
"/api/v1/auth/acl", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "ACLs fetched successfully."
user_acls = UserAcls(**response.json())
assert user_acls.id is not None, "User ID should be set."
assert len(user_acls.access_control_list) == 0, "ACL should be empty."
@pytest.mark.anyio
async def test_api_get_user_acls_with_acl(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL for the user
acl_data = UpdateAccessControlList(
id="",
name="Test ACL",
endpoints=[],
password="secret1234",
)
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json=acl_data.dict(),
)
assert response.status_code == 200, "ACL created successfully."
# Get user ACLs
response = await http_client.get(
"/api/v1/auth/acl", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "ACLs fetched successfully."
user_acls = UserAcls(**response.json())
assert user_acls.id is not None, "User ID should be set."
assert len(user_acls.access_control_list) == 1, "ACL should contain one item."
assert user_acls.access_control_list[0].name == "Test ACL", "ACL name should match."
@pytest.mark.anyio
async def test_api_get_user_acls_sorted(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create some ACLs for the user
acl_names = ["zeta", "alpha", "gamma"]
for name in acl_names:
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={"id": name, "name": name, "password": "secret1234"},
)
assert (
response.status_code == 200
), f"ACL '{name}' should be created successfully."
# Get the user's ACLs
response = await http_client.get(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == 200, "ACLs retrieved."
user_acls = UserAcls(**response.json())
# Check that the ACLs are sorted alphabetically by name
acl_names_sorted = sorted(acl_names)
retrieved_acl_names = [acl.name for acl in user_acls.access_control_list]
assert (
retrieved_acl_names == acl_names_sorted
), "ACLs are not sorted alphabetically by name."
@pytest.mark.anyio
async def test_api_delete_user_acl_success(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create an ACL for the user
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": "Test ACL",
"name": "Test ACL",
"password": "secret1234",
},
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Delete the ACL
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": acl_id,
"password": "secret1234",
},
)
assert response.status_code == 200, "ACL deleted."
@pytest.mark.anyio
async def test_api_delete_user_acl_invalid_password(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create an ACL for the user
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": "Test ACL",
"name": "Test ACL",
"password": "secret1234",
},
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Attempt to delete the ACL with an invalid password
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": acl_id,
"password": "wrongpassword",
},
)
assert response.status_code == 401, "Invalid credentials."
@pytest.mark.anyio
async def test_api_delete_user_acl_nonexistent_acl(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Attempt to delete a nonexistent ACL
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": "nonexistent_acl_id",
"password": "secret1234",
},
)
assert response.status_code == 200, "ACL deleted."
@pytest.mark.anyio
async def test_api_delete_user_acl_missing_password(http_client: AsyncClient):
# Register a new user to obtain the access token
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create an ACL for the user
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": "Test ACL",
"name": "Test ACL",
"password": "secret1234",
},
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Attempt to delete the ACL without providing a password
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json={
"id": acl_id,
},
)
assert response.status_code == 400, "Missing password."
################################ TOKEN ################################
@pytest.mark.anyio
async def test_api_create_user_api_token_success(
http_client: AsyncClient, settings: Settings
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL
acl_data = UpdateAccessControlList(
id="", password="secret1234", name="Test ACL", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json=acl_data.dict(),
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Create API token
token_request = ApiTokenRequest(
acl_id=acl_id,
token_name="Test Token",
expiration_time_minutes=60,
password="secret1234",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=token_request.dict(),
)
assert response.status_code == 200, "API token created."
api_token = response.json().get("api_token")
assert api_token is not None
# Verify the token exists
response = await http_client.get(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == 200, "ACLs fetched successfully."
acls = UserAcls(**response.json())
# Decode the access token to get the user ID
payload: dict = jwt.decode(api_token, settings.auth_secret_key, ["HS256"])
# Check the expiration time
expiration_time = payload.get("exp")
assert expiration_time is not None, "Expiration time should be set."
assert (
0 <= 3600 - (expiration_time - time.time()) <= 5
), "Expiration time should be 60 minutes from now."
token_id = payload["api_token_id"]
assert any(
token_id in [token.id for token in acl.token_id_list]
for acl in acls.access_control_list
), "API token should be part of at least one ACL."
@pytest.mark.anyio
async def test_acl_api_token_access(user_alan: User, http_client: AsyncClient):
user_acls = await get_user_access_control_lists(user_alan.id)
acl = AccessControlList(id=uuid4().hex, name="Test ACL", endpoints=[])
user_acls.access_control_list = [acl]
api_token_id = uuid4().hex
payload = AccessTokenPayload(
sub=user_alan.username or user_alan.id,
api_token_id=api_token_id,
auth_time=int(time.time()),
)
api_token = create_access_token(data=payload.dict(), token_expire_minutes=10)
acl.token_id_list.append(SimpleItem(id=api_token_id, name="Test Token"))
await update_user_access_control_list(user_acls)
headers = {"Authorization": f"Bearer {api_token}"}
response = await http_client.get("/api/v1/auth/acl", headers=headers)
assert response.status_code == 403, "Path not allowed."
assert response.json()["detail"] == "Path not allowed."
# Grant read access
endpoint = EndpointAccess(path="/api/v1/auth", name="Get User ACLs", read=True)
acl.endpoints.append(endpoint)
await update_user_access_control_list(user_acls)
response = await http_client.get("/api/v1/auth/acl", headers=headers)
assert response.status_code == 200, "Access granted."
response = await http_client.put("/api/v1/auth/acl", headers=headers)
assert response.status_code == 403, "Method not allowed."
response = await http_client.post(
"/api/v1/auth/acl/token", headers=headers, json={}
)
assert response.status_code == 403, "Method not allowed."
response = await http_client.patch("/api/v1/auth/acl", headers=headers)
assert response.status_code == 403, "Method not allowed."
response = await http_client.delete("/api/v1/auth/acl", headers=headers)
assert response.status_code == 403, "Method not allowed."
# Grant write access
endpoint.write = True
await update_user_access_control_list(user_acls)
response = await http_client.get("/api/v1/auth/acl", headers=headers)
assert response.status_code == 200, "Access granted."
response = await http_client.put("/api/v1/auth/acl", headers=headers)
assert response.status_code == 400, "Access granted, validation error expected."
response = await http_client.post(
"/api/v1/auth/acl/token", headers=headers, json={}
)
assert response.status_code == 400, "Access granted, validation error expected."
response = await http_client.patch("/api/v1/auth/acl", headers=headers)
assert response.status_code == 400, "Access granted, validation error expected."
response = await http_client.delete("/api/v1/auth/acl", headers=headers)
assert response.status_code == 400, "Access granted, validation error expected."
# Revoke read access
endpoint.read = False
await update_user_access_control_list(user_acls)
response = await http_client.get("/api/v1/auth/acl", headers=headers)
assert response.status_code == 403, "Method not allowed."
response = await http_client.put("/api/v1/auth/acl", headers=headers)
assert (
response.status_code == 400
), "Access still granted, validation error expected."
@pytest.mark.anyio
async def test_api_create_user_api_token_invalid_password(http_client: AsyncClient):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL
acl_data = UpdateAccessControlList(
password="secret1234", id="", name="Test ACL", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json=acl_data.dict(),
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Create API token with invalid password
token_request = ApiTokenRequest(
acl_id=acl_id,
token_name="Test Token",
expiration_time_minutes=60,
password="wrongpassword",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=token_request.dict(),
)
assert response.status_code == 401, "Invalid credentials."
@pytest.mark.anyio
async def test_api_create_user_api_token_invalid_acl_id(http_client: AsyncClient):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create API token with invalid ACL ID
token_request = ApiTokenRequest(
acl_id="invalid_acl_id",
token_name="Test Token",
expiration_time_minutes=60,
password="secret1234",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=token_request.dict(),
)
assert response.status_code == 401, "Invalid ACL id."
@pytest.mark.anyio
async def test_api_create_user_api_token_expiration_time_invalid(
http_client: AsyncClient,
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Create a new ACL
acl_data = UpdateAccessControlList(
id="", password="secret1234", name="Test ACL", endpoints=[]
)
response = await http_client.put(
"/api/v1/auth/acl",
headers={"Authorization": f"Bearer {access_token}"},
json=acl_data.dict(),
)
assert response.status_code == 200, "ACL created."
acl_id = response.json()["access_control_list"][0]["id"]
# Create API token with invalid expiration time
token_request = ApiTokenRequest(
acl_id=acl_id,
token_name="Test Token",
expiration_time_minutes=-1,
password="secret1234",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=token_request.dict(),
)
assert response.status_code == 400, "Expiration time must be in the future."
@pytest.mark.anyio
async def test_api_delete_user_api_token_success(
http_client: AsyncClient, settings: Settings
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Decode the access token to get the user ID
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
user_id = payload["usr"]
# Create a new ACL
acl_data = UpdateAccessControlList(
id="", name="Test ACL", endpoints=[], password="secret1234"
)
user_acls = await get_user_access_control_lists(user_id)
user_acls.access_control_list.append(acl_data)
await update_user_access_control_list(user_acls)
# Create a new API token
api_token_request = ApiTokenRequest(
acl_id=acl_data.id,
token_name="Test Token",
expiration_time_minutes=60,
password="secret1234",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=api_token_request.dict(),
)
assert response.status_code == 200, "API token created."
api_token_id = response.json().get("id")
assert api_token_id is not None
# Delete the API token
delete_token_request = DeleteTokenRequest(
acl_id=acl_data.id, id=api_token_id, password="secret1234"
)
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=delete_token_request.dict(),
)
assert response.status_code == 200, "API token deleted."
@pytest.mark.anyio
async def test_api_delete_user_api_token_invalid_password(
http_client: AsyncClient, settings: Settings
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Decode the access token to get the user ID
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
user_id = payload["usr"]
# Create a new ACL
acl_data = UpdateAccessControlList(
id="", name="Test ACL", endpoints=[], password="secret1234"
)
user_acls = await get_user_access_control_lists(user_id)
user_acls.access_control_list.append(acl_data)
await update_user_access_control_list(user_acls)
# Create a new API token
api_token_request = ApiTokenRequest(
acl_id=acl_data.id,
token_name="Test Token",
expiration_time_minutes=60,
password="secret1234",
)
response = await http_client.post(
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=api_token_request.dict(),
)
assert response.status_code == 200, "API token created."
api_token_id = response.json().get("id")
assert api_token_id is not None
# Attempt to delete the API token with an invalid password
delete_token_request = DeleteTokenRequest(
acl_id=acl_data.id, id=api_token_id, password="wrong_password"
)
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=delete_token_request.dict(),
)
assert response.status_code == 401, "Invalid credentials."
@pytest.mark.anyio
async def test_api_delete_user_api_token_invalid_acl_id(
http_client: AsyncClient,
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Attempt to delete an API token with an invalid ACL ID
delete_token_request = DeleteTokenRequest(
acl_id="invalid_acl_id", id="invalid_token_id", password="secret1234"
)
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=delete_token_request.dict(),
)
assert response.status_code == 401, "Invalid ACL id."
@pytest.mark.anyio
async def test_api_delete_user_api_token_missing_token_id(
http_client: AsyncClient, settings: Settings
):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
# Decode the access token to get the user ID
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
user_id = payload["usr"]
# Create a new ACL
acl_data = UpdateAccessControlList(
id="", name="Test ACL", endpoints=[], password="secret1234"
)
user_acls = await get_user_access_control_lists(user_id)
user_acls.access_control_list.append(acl_data)
await update_user_access_control_list(user_acls)
# Attempt to delete an API token with a missing token ID
delete_token_request = DeleteTokenRequest(
acl_id=acl_data.id, id="", password="secret1234"
)
response = await http_client.request(
"DELETE",
"/api/v1/auth/acl/token",
headers={"Authorization": f"Bearer {access_token}"},
json=delete_token_request.dict(),
)
assert response.status_code == 200, "Does noting if token not found."

View file

@ -4,20 +4,20 @@ import pytest
@pytest.mark.anyio
async def test_create___bad_body(client, adminkey_headers_from):
async def test_create___bad_body(client, user_headers_from):
response = await client.post(
"/api/v1/webpush",
headers=adminkey_headers_from,
headers=user_headers_from,
json={"subscription": "bad_json"},
)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
@pytest.mark.anyio
async def test_create___missing_fields(client, adminkey_headers_from):
async def test_create___missing_fields(client, user_headers_from):
response = await client.post(
"/api/v1/webpush",
headers=adminkey_headers_from,
headers=user_headers_from,
json={"subscription": """{"a": "x"}"""},
)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
@ -34,30 +34,30 @@ async def test_create___bad_access_key(client, inkey_headers_from):
@pytest.mark.anyio
async def test_delete__bad_endpoint_format(client, adminkey_headers_from):
async def test_delete__bad_endpoint_format(client, user_headers_from):
response = await client.delete(
"/api/v1/webpush",
params={"endpoint": "https://this.should.be.base64.com"},
headers=adminkey_headers_from,
headers=user_headers_from,
)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
@pytest.mark.anyio
async def test_delete__no_endpoint_param(client, adminkey_headers_from):
async def test_delete__no_endpoint_param(client, user_headers_from):
response = await client.delete(
"/api/v1/webpush",
headers=adminkey_headers_from,
headers=user_headers_from,
)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
@pytest.mark.anyio
async def test_delete__no_endpoint_found(client, adminkey_headers_from):
async def test_delete__no_endpoint_found(client, user_headers_from):
response = await client.delete(
"/api/v1/webpush",
params={"endpoint": "aHR0cHM6Ly9kZW1vLmxuYml0cy5jb20="},
headers=adminkey_headers_from,
headers=user_headers_from,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["count"] == 0
@ -73,17 +73,17 @@ async def test_delete__bad_access_key(client, inkey_headers_from):
@pytest.mark.anyio
async def test_create_and_delete(client, adminkey_headers_from):
async def test_create_and_delete(client, user_headers_from):
response = await client.post(
"/api/v1/webpush",
headers=adminkey_headers_from,
headers=user_headers_from,
json={"subscription": """{"endpoint": "https://demo.lnbits.com"}"""},
)
assert response.status_code == HTTPStatus.CREATED
response = await client.delete(
"/api/v1/webpush",
params={"endpoint": "aHR0cHM6Ly9kZW1vLmxuYml0cy5jb20="},
headers=adminkey_headers_from,
headers=user_headers_from,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["count"] == 1

View file

@ -219,6 +219,18 @@ async def adminkey_headers_from(from_wallet):
}
@pytest.fixture(scope="session")
async def user_headers_from(client: AsyncClient, from_user: User):
response = await client.post("/api/v1/auth/usr", json={"usr": from_user.id})
client.cookies.clear()
access_token = response.json().get("access_token")
yield {
"Authorization": "Bearer " + access_token,
"Content-type": "application/json",
}
@pytest.fixture(scope="session")
async def inkey_headers_to(to_wallet):
wallet = to_wallet

48
tests/copilot_prompt.md Normal file
View file

@ -0,0 +1,48 @@
# GitHub Copilot Prompts
Make sure to:
- select the code that you want to test. The prompt specifies the name of the file and the function to be tested (this redundancy is needed)
- open tabs with relevant files for the tests, for example: `conftest.py`, `test_auth.py`. This helps Copilot with context.
## Examples
### Create Comprehensive suite of unit tests
_Sample 1_
@workspace /tests Develop a comprehensive suite of unit tests for the selected code (only the function (only the function api_create_user_api_token in auth_api.py file) in auth_api.py file).
Requirements:
- use register endpoint to obtain the access token (see example in test_register_ok)
- write multiple test functions that cover a wide range of scenarios, including the succes flow, edge cases, exception handling, and data validation
- for the success case create a new ACL before creating the token
_Sample 2_
@workspace /tests Develop a comprehensive suite of unit tests for the selected code (only the function check_user_exists in decorators.py file) .
Requirements:
- write multiple test functions that cover a wide range of scenarios, including the succes flow, edge cases, security vulnerabilities, exception handling, and data validation
- use the login endpoint to obtain a valid access token. Use the `user_alan: User` fixture for the login params. Check the `test_login_alan_username_password_ok` function in the `test_auth.py` file as an example for login.
- do not use mocks. For the request parameter initialize the fastapi.Request class.
- make sure to cover all if-then-else branches
### Create tests for a particular usecase
_Sample 1_
@workspace /tests Develop a test for the selected code (only the function api_get_user_acls in auth_api.py file).
Requirements:
- use register endpoint to obtain the access token (see example in test_register_ok)
- the test should only check that the ACLs are sorted alphabeticaly by name
_Sample 1_
@workspace /tests Develop a test for the selected code (only the function check_user_exists in decorators.py file).
Requirements:
- use register endpoint to obtain the access token (see example in the file test_auth.py the function test_register_ok())
- the test should register a new user, obtain the access token then delete the user. Then check that check_user_exists() fails as expected
@workspace /tests Develop a test for the selected code (only the function check_user_exists in decorators.py file).
Requirements:
- check only the branch where user_id_only login is allowed

View file

@ -0,0 +1,138 @@
from uuid import uuid4
import jwt
import pytest
import shortuuid
from fastapi import Request
from fastapi.exceptions import HTTPException
from httpx import AsyncClient
from pydantic.types import UUID4
from lnbits.core.crud.users import delete_account
from lnbits.core.models import User
from lnbits.core.models.users import AccessTokenPayload
from lnbits.decorators import check_user_exists
from lnbits.settings import AuthMethods, Settings, settings
@pytest.mark.anyio
async def test_check_user_exists_with_valid_access_token(
http_client: AsyncClient, user_alan: User
):
# Login to get a valid access token
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json()["access_token"]
assert access_token is not None
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
user = await check_user_exists(request, access_token=access_token)
assert user.id == user_alan.id
assert request.scope["user_id"] == user.id
@pytest.mark.anyio
async def test_check_user_exists_with_invalid_access_token():
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token="invalid_token")
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Invalid access token."
@pytest.mark.anyio
async def test_check_user_exists_with_missing_access_token():
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=None)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Missing user ID or access token."
@pytest.mark.anyio
async def test_check_user_exists_with_valid_user_id(user_alan: User):
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
user = await check_user_exists(request, access_token=None, usr=UUID4(user_alan.id))
assert user.id == user_alan.id
@pytest.mark.anyio
async def test_check_user_exists_with_invalid_user_id():
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=None, usr=uuid4())
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "User not found."
@pytest.mark.anyio
async def test_check_user_exists_with_user_not_allowed(user_alan: User):
settings.lnbits_admin_users = []
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
settings.lnbits_allowed_users = ["only_this_user_id"]
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=None, usr=UUID4(user_alan.id))
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "User not allowed."
@pytest.mark.anyio
async def test_check_user_exists_after_user_deletion(http_client: AsyncClient):
# Register a new user
tiny_id = shortuuid.uuid()[:8]
register_response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert register_response.status_code == 200, "User registers OK"
access_token = register_response.json()["access_token"]
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
# Get the user ID
user_id = access_token_payload.usr
assert user_id, "User ID is not None"
# Delete the user
await delete_account(user_id)
# Attempt to check user existence with the deleted user's access token
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=access_token)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "User not found."
@pytest.mark.anyio
async def test_check_user_exists_with_user_id_only_allowed(
user_alan: User, settings: Settings
):
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
user = await check_user_exists(request, access_token=None, usr=UUID4(user_alan.id))
assert user.id == user_alan.id
assert request.scope["user_id"] == user.id
@pytest.mark.anyio
async def test_check_user_exists_with_user_id_only_not_allowed(user_alan: User):
settings.auth_allowed_methods = []
request = Request({"type": "http", "path": "/some/path", "method": "GET"})
with pytest.raises(HTTPException) as exc_info:
await check_user_exists(request, access_token=None, usr=UUID4(user_alan.id))
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Missing user ID or access token."