feat: do not allow user_id_only login for admins (#2904)
This commit is contained in:
parent
b6bdf50ed7
commit
f845bfe651
6 changed files with 115 additions and 29 deletions
|
|
@ -110,6 +110,10 @@ async def login_usr(data: LoginUsr) -> JSONResponse:
|
||||||
account = await get_account(data.usr)
|
account = await get_account(data.usr)
|
||||||
if not account:
|
if not account:
|
||||||
raise HTTPException(HTTPStatus.UNAUTHORIZED, "User ID does not exist.")
|
raise HTTPException(HTTPStatus.UNAUTHORIZED, "User ID does not exist.")
|
||||||
|
if account.is_admin:
|
||||||
|
raise HTTPException(
|
||||||
|
HTTPStatus.UNAUTHORIZED, "Admin users cannot login with user id only."
|
||||||
|
)
|
||||||
return _auth_success_response(account.username, account.id, account.email)
|
return _auth_success_response(account.username, account.id, account.email)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,10 @@ async def check_user_exists(
|
||||||
account = await _get_account_from_token(access_token, r["path"], r["method"])
|
account = await _get_account_from_token(access_token, r["path"], r["method"])
|
||||||
elif usr and settings.is_auth_method_allowed(AuthMethods.user_id_only):
|
elif usr and settings.is_auth_method_allowed(AuthMethods.user_id_only):
|
||||||
account = await get_account(usr.hex)
|
account = await get_account(usr.hex)
|
||||||
|
if account and account.is_admin:
|
||||||
|
raise HTTPException(
|
||||||
|
HTTPStatus.FORBIDDEN, "User id only access for admins is forbidden."
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Missing user ID or access token.")
|
raise HTTPException(HTTPStatus.UNAUTHORIZED, "Missing user ID or access token.")
|
||||||
|
|
||||||
|
|
@ -190,6 +194,10 @@ async def check_admin(user: Annotated[User, Depends(check_user_exists)]) -> User
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
HTTPStatus.UNAUTHORIZED, "User not authorized. No admin privileges."
|
HTTPStatus.UNAUTHORIZED, "User not authorized. No admin privileges."
|
||||||
)
|
)
|
||||||
|
if not user.has_password:
|
||||||
|
raise HTTPException(
|
||||||
|
HTTPStatus.FORBIDDEN, "Admin users must have credentials configured."
|
||||||
|
)
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
@ -199,6 +207,10 @@ async def check_super_user(user: Annotated[User, Depends(check_user_exists)]) ->
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
HTTPStatus.UNAUTHORIZED, "User not authorized. No super user privileges."
|
HTTPStatus.UNAUTHORIZED, "User not authorized. No super user privileges."
|
||||||
)
|
)
|
||||||
|
if not user.has_password:
|
||||||
|
raise HTTPException(
|
||||||
|
HTTPStatus.FORBIDDEN, "Super user must have credentials configured."
|
||||||
|
)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from lnbits.core.models import User
|
|
||||||
from lnbits.settings import Settings
|
from lnbits.settings import Settings
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -11,19 +11,25 @@ async def test_admin_get_settings_permission_denied(client, from_user):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_admin_get_settings(client, superuser):
|
async def test_admin_get_settings(client: AsyncClient, superuser_token: str):
|
||||||
response = await client.get(f"/admin/api/v1/settings?usr={superuser.id}")
|
response = await client.get(
|
||||||
|
"/admin/api/v1/settings",
|
||||||
|
headers={"Authorization": f"Bearer {superuser_token}"},
|
||||||
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
result = response.json()
|
result = response.json()
|
||||||
assert "super_user" not in result
|
assert "super_user" not in result
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_admin_update_settings(client, superuser: User, settings: Settings):
|
async def test_admin_update_settings(
|
||||||
|
client: AsyncClient, superuser_token: str, settings: Settings
|
||||||
|
):
|
||||||
new_site_title = "UPDATED SITETITLE"
|
new_site_title = "UPDATED SITETITLE"
|
||||||
response = await client.put(
|
response = await client.put(
|
||||||
f"/admin/api/v1/settings?usr={superuser.id}",
|
"/admin/api/v1/settings",
|
||||||
json={"lnbits_site_title": new_site_title},
|
json={"lnbits_site_title": new_site_title},
|
||||||
|
headers={"Authorization": f"Bearer {superuser_token}"},
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
result = response.json()
|
result = response.json()
|
||||||
|
|
@ -33,9 +39,13 @@ async def test_admin_update_settings(client, superuser: User, settings: Settings
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_admin_update_noneditable_settings(client, superuser):
|
async def test_admin_update_noneditable_settings(
|
||||||
|
client: AsyncClient,
|
||||||
|
superuser_token: str,
|
||||||
|
):
|
||||||
response = await client.put(
|
response = await client.put(
|
||||||
f"/admin/api/v1/settings?usr={superuser.id}",
|
"/admin/api/v1/settings",
|
||||||
json={"super_user": "UPDATED"},
|
json={"super_user": "UPDATED"},
|
||||||
|
headers={"Authorization": f"Bearer {superuser_token}"},
|
||||||
)
|
)
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,15 @@ from lnbits.core.models import AccessTokenPayload, User
|
||||||
from lnbits.core.models.misc import SimpleItem
|
from lnbits.core.models.misc import SimpleItem
|
||||||
from lnbits.core.models.users import (
|
from lnbits.core.models.users import (
|
||||||
AccessControlList,
|
AccessControlList,
|
||||||
|
Account,
|
||||||
ApiTokenRequest,
|
ApiTokenRequest,
|
||||||
DeleteTokenRequest,
|
DeleteTokenRequest,
|
||||||
EndpointAccess,
|
EndpointAccess,
|
||||||
|
LoginUsr,
|
||||||
UpdateAccessControlList,
|
UpdateAccessControlList,
|
||||||
UserAcls,
|
UserAcls,
|
||||||
)
|
)
|
||||||
|
from lnbits.core.services.users import create_user_account
|
||||||
from lnbits.core.views.user_api import api_users_reset_password
|
from lnbits.core.views.user_api import api_users_reset_password
|
||||||
from lnbits.helpers import create_access_token
|
from lnbits.helpers import create_access_token
|
||||||
from lnbits.settings import AuthMethods, Settings
|
from lnbits.settings import AuthMethods, Settings
|
||||||
|
|
@ -76,6 +79,54 @@ async def test_login_alan_usr(user_alan: User, http_client: AsyncClient):
|
||||||
assert alan["email"] == user_alan.email
|
assert alan["email"] == user_alan.email
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_login_usr_not_allowed_for_admin_without_credentials(
|
||||||
|
http_client: AsyncClient, settings: Settings
|
||||||
|
):
|
||||||
|
# Register a new user
|
||||||
|
account = Account(id=uuid4().hex)
|
||||||
|
await create_user_account(account)
|
||||||
|
|
||||||
|
# Login with user ID
|
||||||
|
login_data = LoginUsr(usr=account.id)
|
||||||
|
response = await http_client.post("/api/v1/auth/usr", json=login_data.dict())
|
||||||
|
http_client.cookies.clear()
|
||||||
|
assert response.status_code == 200, "User logs in OK."
|
||||||
|
access_token = response.json().get("access_token")
|
||||||
|
assert access_token is not None, "Expected access token after login."
|
||||||
|
headers = {"Authorization": f"Bearer {access_token}"}
|
||||||
|
|
||||||
|
# Simulate the user being an admin without credentials
|
||||||
|
settings.lnbits_admin_users = [account.id]
|
||||||
|
|
||||||
|
# Attempt to login with user ID for admin
|
||||||
|
response = await http_client.post("/api/v1/auth/usr", json=login_data.dict())
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert (
|
||||||
|
response.json().get("detail") == "Admin users cannot login with user id only."
|
||||||
|
)
|
||||||
|
|
||||||
|
response = await http_client.get("/admin/api/v1/settings", headers=headers)
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert (
|
||||||
|
response.json().get("detail") == "Admin users must have credentials configured."
|
||||||
|
)
|
||||||
|
|
||||||
|
# User only access should not be allowed
|
||||||
|
response = await http_client.get(
|
||||||
|
f"/admin/api/v1/settings?usr={settings.super_user}"
|
||||||
|
)
|
||||||
|
print("### response", response.text)
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert (
|
||||||
|
response.json().get("detail") == "User id only access for admins is forbidden."
|
||||||
|
)
|
||||||
|
|
||||||
|
response = await http_client.get("/api/v1/status", headers=headers)
|
||||||
|
assert response.status_code == 200, "Admin user can access regular endpoints."
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_login_usr_not_allowed(
|
async def test_login_usr_not_allowed(
|
||||||
user_alan: User, http_client: AsyncClient, settings: Settings
|
user_alan: User, http_client: AsyncClient, settings: Settings
|
||||||
|
|
|
||||||
|
|
@ -12,14 +12,14 @@ from lnbits.app import create_app
|
||||||
from lnbits.core.crud import (
|
from lnbits.core.crud import (
|
||||||
create_wallet,
|
create_wallet,
|
||||||
delete_account,
|
delete_account,
|
||||||
get_account,
|
|
||||||
get_account_by_username,
|
get_account_by_username,
|
||||||
get_payment,
|
get_payment,
|
||||||
get_user_from_account,
|
|
||||||
update_payment,
|
update_payment,
|
||||||
)
|
)
|
||||||
from lnbits.core.models import Account, CreateInvoice, PaymentState, User
|
from lnbits.core.models import Account, CreateInvoice, PaymentState, User
|
||||||
|
from lnbits.core.models.users import UpdateSuperuserPassword
|
||||||
from lnbits.core.services import create_user_account, update_wallet_balance
|
from lnbits.core.services import create_user_account, update_wallet_balance
|
||||||
|
from lnbits.core.views.auth_api import first_install
|
||||||
from lnbits.core.views.payment_api import _api_payments_create_invoice
|
from lnbits.core.views.payment_api import _api_payments_create_invoice
|
||||||
from lnbits.db import DB_TYPE, SQLITE, Database
|
from lnbits.db import DB_TYPE, SQLITE, Database
|
||||||
from lnbits.settings import AuthMethods, Settings
|
from lnbits.settings import AuthMethods, Settings
|
||||||
|
|
@ -62,7 +62,15 @@ def run_before_and_after_tests(settings: Settings):
|
||||||
async def app(settings: Settings):
|
async def app(settings: Settings):
|
||||||
app = create_app()
|
app = create_app()
|
||||||
async with LifespanManager(app) as manager:
|
async with LifespanManager(app) as manager:
|
||||||
settings.first_install = False
|
settings.first_install = True
|
||||||
|
await first_install(
|
||||||
|
UpdateSuperuserPassword(
|
||||||
|
username="superadmin",
|
||||||
|
password="secret1234",
|
||||||
|
password_repeat="secret1234",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
yield manager.app
|
yield manager.app
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -150,20 +158,14 @@ async def to_user():
|
||||||
yield user
|
yield user
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def from_super_user(from_user: User, settings: Settings):
|
|
||||||
prev = settings.super_user
|
|
||||||
settings.super_user = from_user.id
|
|
||||||
yield from_user
|
|
||||||
settings.super_user = prev
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
async def superuser(settings: Settings):
|
async def superuser_token(client: AsyncClient):
|
||||||
account = await get_account(settings.super_user)
|
response = await client.post(
|
||||||
assert account, "Superuser not found"
|
"/api/v1/auth", json={"username": "superadmin", "password": "secret1234"}
|
||||||
user = await get_user_from_account(account)
|
)
|
||||||
yield user
|
client.cookies.clear()
|
||||||
|
|
||||||
|
yield response.json().get("access_token")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
|
|
@ -307,3 +309,4 @@ def _settings_cleanup(settings: Settings):
|
||||||
settings.lnbits_service_fee = 0
|
settings.lnbits_service_fee = 0
|
||||||
settings.lnbits_wallet_limit_daily_max_withdraw = 0
|
settings.lnbits_wallet_limit_daily_max_withdraw = 0
|
||||||
settings.lnbits_admin_extensions = []
|
settings.lnbits_admin_extensions = []
|
||||||
|
settings.lnbits_admin_users = []
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,12 @@ import random
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from httpx import AsyncClient, Headers
|
||||||
from pydantic import parse_obj_as
|
from pydantic import parse_obj_as
|
||||||
|
|
||||||
from lnbits import bolt11
|
from lnbits import bolt11
|
||||||
from lnbits.nodes.base import ChannelPoint, ChannelState, NodeChannel
|
from lnbits.nodes.base import ChannelPoint, ChannelState, NodeChannel
|
||||||
|
from lnbits.settings import Settings
|
||||||
|
|
||||||
from ..helpers import (
|
from ..helpers import (
|
||||||
funding_source,
|
funding_source,
|
||||||
|
|
@ -24,14 +26,14 @@ pytestmark = pytest.mark.skipif(
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
async def node_client(client, from_super_user, settings):
|
async def node_client(client: AsyncClient, settings: Settings, superuser_token: str):
|
||||||
settings.lnbits_node_ui = True
|
settings.lnbits_node_ui = True
|
||||||
settings.lnbits_public_node_ui = False
|
settings.lnbits_public_node_ui = False
|
||||||
settings.lnbits_node_ui_transactions = True
|
settings.lnbits_node_ui_transactions = True
|
||||||
params = client.params
|
headers = client.headers
|
||||||
client.params = {"usr": from_super_user.id}
|
client.headers = Headers({"Authorization": f"Bearer {superuser_token}"})
|
||||||
yield client
|
yield client
|
||||||
client.params = params
|
client.headers = headers
|
||||||
settings.lnbits_node_ui = False
|
settings.lnbits_node_ui = False
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -43,9 +45,13 @@ async def public_node_client(node_client, settings):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_node_info_not_found(client, from_super_user, settings):
|
async def test_node_info_not_found(
|
||||||
|
client: AsyncClient, settings: Settings, superuser_token: str
|
||||||
|
):
|
||||||
settings.lnbits_node_ui = False
|
settings.lnbits_node_ui = False
|
||||||
response = await client.get("/node/api/v1/info", params={"usr": from_super_user.id})
|
response = await client.get(
|
||||||
|
"/node/api/v1/info", headers={"Authorization": f"Bearer {superuser_token}"}
|
||||||
|
)
|
||||||
assert response.status_code == HTTPStatus.SERVICE_UNAVAILABLE
|
assert response.status_code == HTTPStatus.SERVICE_UNAVAILABLE
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue