From f845bfe6517018cceab714e0d6f0ef3cc62228e3 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 23 Jan 2025 15:01:54 +0200 Subject: [PATCH] feat: do not allow `user_id_only` login for admins (#2904) --- lnbits/core/views/auth_api.py | 4 +++ lnbits/decorators.py | 12 ++++++++ tests/api/test_admin_api.py | 24 ++++++++++----- tests/api/test_auth.py | 51 ++++++++++++++++++++++++++++++++ tests/conftest.py | 35 ++++++++++++---------- tests/regtest/test_x_node_api.py | 18 +++++++---- 6 files changed, 115 insertions(+), 29 deletions(-) diff --git a/lnbits/core/views/auth_api.py b/lnbits/core/views/auth_api.py index d6414927..787acf53 100644 --- a/lnbits/core/views/auth_api.py +++ b/lnbits/core/views/auth_api.py @@ -110,6 +110,10 @@ async def login_usr(data: LoginUsr) -> JSONResponse: account = await get_account(data.usr) if not account: raise HTTPException(HTTPStatus.UNAUTHORIZED, "User ID does not exist.") + if account.is_admin: + raise HTTPException( + HTTPStatus.UNAUTHORIZED, "Admin users cannot login with user id only." + ) return _auth_success_response(account.username, account.id, account.email) diff --git a/lnbits/decorators.py b/lnbits/decorators.py index f76ec3e8..5f401ef7 100644 --- a/lnbits/decorators.py +++ b/lnbits/decorators.py @@ -144,6 +144,10 @@ async def check_user_exists( account = await _get_account_from_token(access_token, r["path"], r["method"]) elif usr and settings.is_auth_method_allowed(AuthMethods.user_id_only): account = await get_account(usr.hex) + if account and account.is_admin: + raise HTTPException( + HTTPStatus.FORBIDDEN, "User id only access for admins is forbidden." + ) else: raise HTTPException(HTTPStatus.UNAUTHORIZED, "Missing user ID or access token.") @@ -190,6 +194,10 @@ async def check_admin(user: Annotated[User, Depends(check_user_exists)]) -> User raise HTTPException( HTTPStatus.UNAUTHORIZED, "User not authorized. No admin privileges." ) + if not user.has_password: + raise HTTPException( + HTTPStatus.FORBIDDEN, "Admin users must have credentials configured." + ) return user @@ -199,6 +207,10 @@ async def check_super_user(user: Annotated[User, Depends(check_user_exists)]) -> raise HTTPException( HTTPStatus.UNAUTHORIZED, "User not authorized. No super user privileges." ) + if not user.has_password: + raise HTTPException( + HTTPStatus.FORBIDDEN, "Super user must have credentials configured." + ) return user diff --git a/tests/api/test_admin_api.py b/tests/api/test_admin_api.py index c1514144..7a1d4f04 100644 --- a/tests/api/test_admin_api.py +++ b/tests/api/test_admin_api.py @@ -1,6 +1,6 @@ import pytest +from httpx import AsyncClient -from lnbits.core.models import User from lnbits.settings import Settings @@ -11,19 +11,25 @@ async def test_admin_get_settings_permission_denied(client, from_user): @pytest.mark.anyio -async def test_admin_get_settings(client, superuser): - response = await client.get(f"/admin/api/v1/settings?usr={superuser.id}") +async def test_admin_get_settings(client: AsyncClient, superuser_token: str): + response = await client.get( + "/admin/api/v1/settings", + headers={"Authorization": f"Bearer {superuser_token}"}, + ) assert response.status_code == 200 result = response.json() assert "super_user" not in result @pytest.mark.anyio -async def test_admin_update_settings(client, superuser: User, settings: Settings): +async def test_admin_update_settings( + client: AsyncClient, superuser_token: str, settings: Settings +): new_site_title = "UPDATED SITETITLE" response = await client.put( - f"/admin/api/v1/settings?usr={superuser.id}", + "/admin/api/v1/settings", json={"lnbits_site_title": new_site_title}, + headers={"Authorization": f"Bearer {superuser_token}"}, ) assert response.status_code == 200 result = response.json() @@ -33,9 +39,13 @@ async def test_admin_update_settings(client, superuser: User, settings: Settings @pytest.mark.anyio -async def test_admin_update_noneditable_settings(client, superuser): +async def test_admin_update_noneditable_settings( + client: AsyncClient, + superuser_token: str, +): response = await client.put( - f"/admin/api/v1/settings?usr={superuser.id}", + "/admin/api/v1/settings", json={"super_user": "UPDATED"}, + headers={"Authorization": f"Bearer {superuser_token}"}, ) assert response.status_code == 400 diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 6898ff9d..ddacebd1 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -18,12 +18,15 @@ from lnbits.core.models import AccessTokenPayload, User from lnbits.core.models.misc import SimpleItem from lnbits.core.models.users import ( AccessControlList, + Account, ApiTokenRequest, DeleteTokenRequest, EndpointAccess, + LoginUsr, UpdateAccessControlList, UserAcls, ) +from lnbits.core.services.users import create_user_account from lnbits.core.views.user_api import api_users_reset_password from lnbits.helpers import create_access_token from lnbits.settings import AuthMethods, Settings @@ -76,6 +79,54 @@ async def test_login_alan_usr(user_alan: User, http_client: AsyncClient): assert alan["email"] == user_alan.email +@pytest.mark.anyio +async def test_login_usr_not_allowed_for_admin_without_credentials( + http_client: AsyncClient, settings: Settings +): + # Register a new user + account = Account(id=uuid4().hex) + await create_user_account(account) + + # Login with user ID + login_data = LoginUsr(usr=account.id) + response = await http_client.post("/api/v1/auth/usr", json=login_data.dict()) + http_client.cookies.clear() + assert response.status_code == 200, "User logs in OK." + access_token = response.json().get("access_token") + assert access_token is not None, "Expected access token after login." + headers = {"Authorization": f"Bearer {access_token}"} + + # Simulate the user being an admin without credentials + settings.lnbits_admin_users = [account.id] + + # Attempt to login with user ID for admin + response = await http_client.post("/api/v1/auth/usr", json=login_data.dict()) + + assert response.status_code == 401 + assert ( + response.json().get("detail") == "Admin users cannot login with user id only." + ) + + response = await http_client.get("/admin/api/v1/settings", headers=headers) + assert response.status_code == 403 + assert ( + response.json().get("detail") == "Admin users must have credentials configured." + ) + + # User only access should not be allowed + response = await http_client.get( + f"/admin/api/v1/settings?usr={settings.super_user}" + ) + print("### response", response.text) + assert response.status_code == 403 + assert ( + response.json().get("detail") == "User id only access for admins is forbidden." + ) + + response = await http_client.get("/api/v1/status", headers=headers) + assert response.status_code == 200, "Admin user can access regular endpoints." + + @pytest.mark.anyio async def test_login_usr_not_allowed( user_alan: User, http_client: AsyncClient, settings: Settings diff --git a/tests/conftest.py b/tests/conftest.py index 723f47a8..3761ca69 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,14 +12,14 @@ from lnbits.app import create_app from lnbits.core.crud import ( create_wallet, delete_account, - get_account, get_account_by_username, get_payment, - get_user_from_account, update_payment, ) from lnbits.core.models import Account, CreateInvoice, PaymentState, User +from lnbits.core.models.users import UpdateSuperuserPassword from lnbits.core.services import create_user_account, update_wallet_balance +from lnbits.core.views.auth_api import first_install from lnbits.core.views.payment_api import _api_payments_create_invoice from lnbits.db import DB_TYPE, SQLITE, Database from lnbits.settings import AuthMethods, Settings @@ -62,7 +62,15 @@ def run_before_and_after_tests(settings: Settings): async def app(settings: Settings): app = create_app() async with LifespanManager(app) as manager: - settings.first_install = False + settings.first_install = True + await first_install( + UpdateSuperuserPassword( + username="superadmin", + password="secret1234", + password_repeat="secret1234", + ) + ) + yield manager.app @@ -150,20 +158,14 @@ async def to_user(): yield user -@pytest.fixture() -def from_super_user(from_user: User, settings: Settings): - prev = settings.super_user - settings.super_user = from_user.id - yield from_user - settings.super_user = prev - - @pytest.fixture(scope="session") -async def superuser(settings: Settings): - account = await get_account(settings.super_user) - assert account, "Superuser not found" - user = await get_user_from_account(account) - yield user +async def superuser_token(client: AsyncClient): + response = await client.post( + "/api/v1/auth", json={"username": "superadmin", "password": "secret1234"} + ) + client.cookies.clear() + + yield response.json().get("access_token") @pytest.fixture(scope="session") @@ -307,3 +309,4 @@ def _settings_cleanup(settings: Settings): settings.lnbits_service_fee = 0 settings.lnbits_wallet_limit_daily_max_withdraw = 0 settings.lnbits_admin_extensions = [] + settings.lnbits_admin_users = [] diff --git a/tests/regtest/test_x_node_api.py b/tests/regtest/test_x_node_api.py index ff759baa..48a7ee93 100644 --- a/tests/regtest/test_x_node_api.py +++ b/tests/regtest/test_x_node_api.py @@ -3,10 +3,12 @@ import random from http import HTTPStatus import pytest +from httpx import AsyncClient, Headers from pydantic import parse_obj_as from lnbits import bolt11 from lnbits.nodes.base import ChannelPoint, ChannelState, NodeChannel +from lnbits.settings import Settings from ..helpers import ( funding_source, @@ -24,14 +26,14 @@ pytestmark = pytest.mark.skipif( @pytest.fixture() -async def node_client(client, from_super_user, settings): +async def node_client(client: AsyncClient, settings: Settings, superuser_token: str): settings.lnbits_node_ui = True settings.lnbits_public_node_ui = False settings.lnbits_node_ui_transactions = True - params = client.params - client.params = {"usr": from_super_user.id} + headers = client.headers + client.headers = Headers({"Authorization": f"Bearer {superuser_token}"}) yield client - client.params = params + client.headers = headers settings.lnbits_node_ui = False @@ -43,9 +45,13 @@ async def public_node_client(node_client, settings): @pytest.mark.anyio -async def test_node_info_not_found(client, from_super_user, settings): +async def test_node_info_not_found( + client: AsyncClient, settings: Settings, superuser_token: str +): settings.lnbits_node_ui = False - response = await client.get("/node/api/v1/info", params={"usr": from_super_user.id}) + response = await client.get( + "/node/api/v1/info", headers={"Authorization": f"Bearer {superuser_token}"} + ) assert response.status_code == HTTPStatus.SERVICE_UNAVAILABLE