Add centralized authorization module and fix security vulnerabilities

- Create auth.py with AuthContext, require_super_user, require_authenticated
- Fix 6 CRITICAL unprotected endpoints exposing sensitive data
- Consolidate 16+ admin endpoints with duplicated super_user checks
- Standardize on user_id (wallet.wallet.user) instead of wallet_id

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
padreug 2026-01-07 13:35:07 +01:00
parent e403ec223d
commit ca0cee7312
2 changed files with 412 additions and 191 deletions

310
auth.py Normal file
View file

@ -0,0 +1,310 @@
"""
Centralized Authorization Module for Castle Extension.
Provides consistent, secure authorization patterns across all endpoints.
Key concepts:
- AuthContext: Captures all authorization state for a request
- Dependencies: FastAPI dependencies for endpoint protection
- Permission checks: Consistent resource-level access control
Usage:
from .auth import require_super_user, require_authenticated, AuthContext
@router.get("/api/v1/admin-endpoint")
async def admin_endpoint(auth: AuthContext = Depends(require_super_user)):
# Only super users can access
pass
@router.get("/api/v1/user-data")
async def user_data(auth: AuthContext = Depends(require_authenticated)):
# Any authenticated user
user_id = auth.user_id
pass
"""
from dataclasses import dataclass
from functools import wraps
from http import HTTPStatus
from typing import Optional
from fastapi import Depends, HTTPException
from lnbits.core.models import WalletTypeInfo
from lnbits.decorators import require_admin_key, require_invoice_key
from lnbits.settings import settings as lnbits_settings
from loguru import logger
from .crud import get_account, get_user_permissions
from .models import PermissionType
@dataclass
class AuthContext:
"""
Authorization context for a request.
Contains all information needed to make authorization decisions.
Use this instead of directly accessing wallet/user properties scattered
throughout endpoint code.
"""
user_id: str
wallet_id: str
is_super_user: bool
wallet: WalletTypeInfo
@property
def is_admin(self) -> bool:
"""
Check if user is a Castle admin (super user).
Note: In Castle, admin = super_user. There's no separate admin concept.
"""
return self.is_super_user
def require_super_user(self) -> None:
"""Raise HTTPException if not super user."""
if not self.is_super_user:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Super user access required"
)
def require_self_or_super_user(self, target_user_id: str) -> None:
"""
Require that user is accessing their own data or is super user.
Args:
target_user_id: The user ID being accessed
Raises:
HTTPException: If user is neither the target nor super user
"""
if not self.is_super_user and self.user_id != target_user_id:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Access denied: you can only access your own data"
)
def _build_auth_context(wallet: WalletTypeInfo) -> AuthContext:
"""Build AuthContext from wallet info."""
user_id = wallet.wallet.user
return AuthContext(
user_id=user_id,
wallet_id=wallet.wallet.id,
is_super_user=user_id == lnbits_settings.super_user,
wallet=wallet,
)
# ===== FastAPI Dependencies =====
async def require_authenticated(
wallet: WalletTypeInfo = Depends(require_invoice_key),
) -> AuthContext:
"""
Require authentication (invoice key minimum).
Returns AuthContext with user information.
Use for read-only access to user's own data.
"""
return _build_auth_context(wallet)
async def require_authenticated_write(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> AuthContext:
"""
Require authentication with write permissions (admin key).
Returns AuthContext with user information.
Use for write operations on user's own data.
"""
return _build_auth_context(wallet)
async def require_super_user(
wallet: WalletTypeInfo = Depends(require_admin_key),
) -> AuthContext:
"""
Require super user access.
Raises HTTPException 403 if not super user.
Use for Castle admin operations.
"""
auth = _build_auth_context(wallet)
if not auth.is_super_user:
logger.warning(
f"Super user access denied for user {auth.user_id[:8]} "
f"attempting admin operation"
)
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Super user access required"
)
return auth
# ===== Resource Access Checks =====
async def can_access_account(
auth: AuthContext,
account_id: str,
permission_type: PermissionType = PermissionType.READ,
) -> bool:
"""
Check if user can access an account.
Access is granted if:
1. User is super user (full access)
2. User owns the account (user-specific accounts like Assets:Receivable:User-abc123)
3. User has explicit permission for the account
Args:
auth: The authorization context
account_id: The account ID to check
permission_type: The type of access needed (READ, SUBMIT_EXPENSE, MANAGE)
Returns:
True if access is allowed, False otherwise
"""
# Super users have full access
if auth.is_super_user:
return True
# Check if this is the user's own account
account = await get_account(account_id)
if account:
user_short = auth.user_id[:8]
if f"User-{user_short}" in account.name:
return True
# Check explicit permissions
permissions = await get_user_permissions(auth.user_id)
for perm in permissions:
if perm.account_id == account_id:
# Check if permission type is sufficient
if perm.permission_type == PermissionType.MANAGE:
return True # MANAGE grants all access
if perm.permission_type == permission_type:
return True
if (
permission_type == PermissionType.READ
and perm.permission_type in [PermissionType.SUBMIT_EXPENSE, PermissionType.MANAGE]
):
return True # Higher permissions include READ
return False
async def require_account_access(
auth: AuthContext,
account_id: str,
permission_type: PermissionType = PermissionType.READ,
) -> None:
"""
Require access to an account, raising HTTPException if denied.
Args:
auth: The authorization context
account_id: The account ID to check
permission_type: The type of access needed
Raises:
HTTPException: If access is denied
"""
if not await can_access_account(auth, account_id, permission_type):
logger.warning(
f"Account access denied: user {auth.user_id[:8]} "
f"attempted {permission_type.value} on account {account_id}"
)
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"Access denied to account {account_id}"
)
async def can_access_user_data(auth: AuthContext, target_user_id: str) -> bool:
"""
Check if user can access another user's data.
Access is granted if:
1. User is super user
2. User is accessing their own data
Args:
auth: The authorization context
target_user_id: The user ID whose data is being accessed
Returns:
True if access is allowed
"""
if auth.is_super_user:
return True
# Users can access their own data - compare full ID or short ID
if auth.user_id == target_user_id:
return True
# Also allow if short IDs match (8 char prefix)
if auth.user_id[:8] == target_user_id[:8]:
return True
return False
async def require_user_data_access(
auth: AuthContext,
target_user_id: str,
) -> None:
"""
Require access to a user's data, raising HTTPException if denied.
Args:
auth: The authorization context
target_user_id: The user ID whose data is being accessed
Raises:
HTTPException: If access is denied
"""
if not await can_access_user_data(auth, target_user_id):
logger.warning(
f"User data access denied: user {auth.user_id[:8]} "
f"attempted to access data for user {target_user_id[:8]}"
)
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="Access denied: you can only access your own data"
)
# ===== Utility Functions =====
def get_user_id_from_wallet(wallet: WalletTypeInfo) -> str:
"""
Get user ID from wallet info.
IMPORTANT: Always use wallet.wallet.user (not wallet.wallet.id).
- wallet.wallet.user = the user's ID
- wallet.wallet.id = the wallet's ID (NOT the same!)
Args:
wallet: The wallet type info from LNbits
Returns:
The user ID
"""
return wallet.wallet.user
def is_super_user(user_id: str) -> bool:
"""
Check if a user ID is the super user.
Args:
user_id: The user ID to check
Returns:
True if this is the super user
"""
return user_id == lnbits_settings.super_user