diff --git a/lnbits/extensions/boltcards/__init__.py b/lnbits/extensions/boltcards/__init__.py index 69326708..f1ef972e 100644 --- a/lnbits/extensions/boltcards/__init__.py +++ b/lnbits/extensions/boltcards/__init__.py @@ -5,10 +5,7 @@ from lnbits.helpers import template_renderer db = Database("ext_boltcards") -boltcards_ext: APIRouter = APIRouter( - prefix="/boltcards", - tags=["boltcards"] -) +boltcards_ext: APIRouter = APIRouter(prefix="/boltcards", tags=["boltcards"]) def boltcards_renderer(): diff --git a/lnbits/extensions/boltcards/crud.py b/lnbits/extensions/boltcards/crud.py index f34ce659..7cf5cad1 100644 --- a/lnbits/extensions/boltcards/crud.py +++ b/lnbits/extensions/boltcards/crud.py @@ -1,13 +1,13 @@ from optparse import Option from typing import List, Optional, Union + from lnbits.helpers import urlsafe_short_hash from . import db from .models import Card, CreateCardData, Hit -async def create_card( - data: CreateCardData, wallet_id: str -) -> Card: + +async def create_card(data: CreateCardData, wallet_id: str) -> Card: card_id = urlsafe_short_hash() await db.execute( """ @@ -38,6 +38,7 @@ async def create_card( assert card, "Newly created card couldn't be retrieved" return card + async def update_card(card_id: str, **kwargs) -> Optional[Card]: if "is_unique" in kwargs: kwargs["is_unique"] = int(kwargs["is_unique"]) @@ -46,11 +47,10 @@ async def update_card(card_id: str, **kwargs) -> Optional[Card]: f"UPDATE boltcards.cards SET {q} WHERE id = ?", (*kwargs.values(), card_id), ) - row = await db.fetchone( - "SELECT * FROM boltcards.cards WHERE id = ?", (card_id,) - ) + row = await db.fetchone("SELECT * FROM boltcards.cards WHERE id = ?", (card_id,)) return Card(**row) if row else None + async def get_cards(wallet_ids: Union[str, List[str]]) -> List[Card]: if isinstance(wallet_ids, str): wallet_ids = [wallet_ids] @@ -62,17 +62,20 @@ async def get_cards(wallet_ids: Union[str, List[str]]) -> List[Card]: return [Card(**row) for row in rows] + async def get_all_cards() -> List[Card]: - rows = await db.fetchall( - f"SELECT * FROM boltcards.cards" - ) + rows = await db.fetchall(f"SELECT * FROM boltcards.cards") return [Card(**row) for row in rows] -async def get_card(card_id: str, id_is_uid: bool=False) -> Optional[Card]: - sql = "SELECT * FROM boltcards.cards WHERE {} = ?".format("uid" if id_is_uid else "id") + +async def get_card(card_id: str, id_is_uid: bool = False) -> Optional[Card]: + sql = "SELECT * FROM boltcards.cards WHERE {} = ?".format( + "uid" if id_is_uid else "id" + ) row = await db.fetchone( - sql, card_id, + sql, + card_id, ) if not row: return None @@ -81,19 +84,20 @@ async def get_card(card_id: str, id_is_uid: bool=False) -> Optional[Card]: return Card.parse_obj(card) + async def delete_card(card_id: str) -> None: await db.execute("DELETE FROM boltcards.cards WHERE id = ?", (card_id,)) + async def update_card_counter(counter: int, id: str): await db.execute( "UPDATE boltcards.cards SET counter = ? WHERE id = ?", (counter, id), ) + async def get_hit(hit_id: str) -> Optional[Hit]: - row = await db.fetchone( - f"SELECT * FROM boltcards.hits WHERE id = ?", (hit_id) - ) + row = await db.fetchone(f"SELECT * FROM boltcards.hits WHERE id = ?", (hit_id)) if not row: return None @@ -101,6 +105,7 @@ async def get_hit(hit_id: str) -> Optional[Hit]: return Hit.parse_obj(hit) + async def get_hits(cards_ids: Union[str, List[str]]) -> List[Hit]: q = ",".join(["?"] * len(cards_ids)) rows = await db.fetchall( @@ -109,9 +114,8 @@ async def get_hits(cards_ids: Union[str, List[str]]) -> List[Hit]: return [Hit(**row) for row in rows] -async def create_hit( - card_id, ip, useragent, old_ctr, new_ctr -) -> Hit: + +async def create_hit(card_id, ip, useragent, old_ctr, new_ctr) -> Hit: hit_id = urlsafe_short_hash() await db.execute( """ diff --git a/lnbits/extensions/boltcards/migrations.py b/lnbits/extensions/boltcards/migrations.py index e7236ce7..6e0fa072 100644 --- a/lnbits/extensions/boltcards/migrations.py +++ b/lnbits/extensions/boltcards/migrations.py @@ -1,5 +1,6 @@ from lnbits.helpers import urlsafe_short_hash + async def m001_initial(db): await db.execute( """ diff --git a/lnbits/extensions/boltcards/models.py b/lnbits/extensions/boltcards/models.py index 728aa2bb..b6d521c3 100644 --- a/lnbits/extensions/boltcards/models.py +++ b/lnbits/extensions/boltcards/models.py @@ -1,5 +1,6 @@ -from pydantic import BaseModel from fastapi.params import Query +from pydantic import BaseModel + class Card(BaseModel): id: str @@ -12,6 +13,7 @@ class Card(BaseModel): meta_key: str time: int + class CreateCardData(BaseModel): card_name: str = Query(...) uid: str = Query(...) @@ -20,6 +22,7 @@ class CreateCardData(BaseModel): file_key: str = Query(...) meta_key: str = Query(...) + class Hit(BaseModel): id: str card_id: str diff --git a/lnbits/extensions/boltcards/nxp424.py b/lnbits/extensions/boltcards/nxp424.py index effa987d..83f4e50d 100644 --- a/lnbits/extensions/boltcards/nxp424.py +++ b/lnbits/extensions/boltcards/nxp424.py @@ -1,18 +1,21 @@ # https://www.nxp.com/docs/en/application-note/AN12196.pdf from typing import Tuple -from Cryptodome.Hash import CMAC + from Cryptodome.Cipher import AES +from Cryptodome.Hash import CMAC SV2 = "3CC300010080" -def myCMAC(key: bytes, msg: bytes=b'') -> bytes: + +def myCMAC(key: bytes, msg: bytes = b"") -> bytes: cobj = CMAC.new(key, ciphermod=AES) - if msg != b'': + if msg != b"": cobj.update(msg) return cobj.digest() + def decryptSUN(sun: bytes, key: bytes) -> Tuple[bytes, bytes]: - IVbytes = b"\x00" * 16 + IVbytes = b"\x00" * 16 cipher = AES.new(key, AES.MODE_CBC, IVbytes) sun_plain = cipher.decrypt(sun) @@ -22,6 +25,7 @@ def decryptSUN(sun: bytes, key: bytes) -> Tuple[bytes, bytes]: return UID, counter + def getSunMAC(UID: bytes, counter: bytes, key: bytes) -> bytes: sv2prefix = bytes.fromhex(SV2) sv2bytes = sv2prefix + UID + counter diff --git a/lnbits/extensions/boltcards/views_api.py b/lnbits/extensions/boltcards/views_api.py index b13d9c35..fbd05cce 100644 --- a/lnbits/extensions/boltcards/views_api.py +++ b/lnbits/extensions/boltcards/views_api.py @@ -17,19 +17,20 @@ from lnbits.decorators import WalletTypeInfo, get_key_type, require_admin_key from lnbits.extensions.withdraw import get_withdraw_link from . import boltcards_ext -from .nxp424 import decryptSUN, getSunMAC from .crud import ( - create_hit, - get_all_cards, - get_cards, - get_card, create_card, + create_hit, + delete_card, + get_all_cards, + get_card, + get_cards, get_hits, update_card, - delete_card, - update_card_counter + update_card_counter, ) from .models import CreateCardData +from .nxp424 import decryptSUN, getSunMAC + @boltcards_ext.get("/api/v1/cards") async def api_cards( @@ -42,32 +43,15 @@ async def api_cards( return [card.dict() for card in await get_cards(wallet_ids)] + @boltcards_ext.post("/api/v1/cards", status_code=HTTPStatus.CREATED) @boltcards_ext.put("/api/v1/cards/{card_id}", status_code=HTTPStatus.OK) async def api_link_create_or_update( -# req: Request, + # req: Request, data: CreateCardData, card_id: str = None, wallet: WalletTypeInfo = Depends(require_admin_key), ): - ''' - TODO: some checks - if data.uses > 250: - raise HTTPException( - detail="250 uses max.", status_code=HTTPStatus.BAD_REQUEST - ) - - if data.min_withdrawable < 1: - raise HTTPException( - detail="Min must be more than 1.", status_code=HTTPStatus.BAD_REQUEST - ) - - if data.max_withdrawable < data.min_withdrawable: - raise HTTPException( - detail="`max_withdrawable` needs to be at least `min_withdrawable`.", - status_code=HTTPStatus.BAD_REQUEST, - ) - ''' if card_id: card = await get_card(card_id) if not card: @@ -78,15 +62,12 @@ async def api_link_create_or_update( raise HTTPException( detail="Not your card.", status_code=HTTPStatus.FORBIDDEN ) - card = await update_card( - card_id, **data.dict() - ) + card = await update_card(card_id, **data.dict()) else: - card = await create_card( - wallet_id=wallet.wallet.id, data=data - ) + card = await create_card(wallet_id=wallet.wallet.id, data=data) return card.dict() + @boltcards_ext.delete("/api/v1/cards/{card_id}") async def api_link_delete(card_id, wallet: WalletTypeInfo = Depends(require_admin_key)): card = await get_card(card_id) @@ -97,13 +78,12 @@ async def api_link_delete(card_id, wallet: WalletTypeInfo = Depends(require_admi ) if card.wallet != wallet.wallet.id: - raise HTTPException( - detail="Not your card.", status_code=HTTPStatus.FORBIDDEN - ) + raise HTTPException(detail="Not your card.", status_code=HTTPStatus.FORBIDDEN) await delete_card(card_id) raise HTTPException(status_code=HTTPStatus.NO_CONTENT) + @boltcards_ext.get("/api/v1/hits") async def api_hits( g: WalletTypeInfo = Depends(get_key_type), all_wallets: bool = Query(False) @@ -120,20 +100,33 @@ async def api_hits( return [hit.dict() for hit in await get_hits(cards_ids)] + # /boltcards/api/v1/scan/?uid=00000000000000&ctr=000000&c=0000000000000000 -@boltcards_ext.get("/api/v1/scan/") -async def api_scan( - uid, ctr, c, - request: Request -): +@boltcards_ext.get("/api/v1/scan/") +async def api_scan(uid, ctr, c, request: Request): card = await get_card(uid, id_is_uid=True) if card == None: return {"status": "ERROR", "reason": "Unknown card."} - if c != getSunMAC(bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key)).hex().upper(): + if ( + c + != getSunMAC( + bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key) + ) + .hex() + .upper() + ): print(c) - print(getSunMAC(bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key)).hex().upper()) + print( + getSunMAC( + bytes.fromhex(uid), + bytes.fromhex(ctr)[::-1], + bytes.fromhex(card.file_key), + ) + .hex() + .upper() + ) return {"status": "ERROR", "reason": "CMAC does not check."} ctr_int = int(ctr, 16) @@ -145,32 +138,32 @@ async def api_scan( # gathering some info for hit record ip = request.client.host - if request.headers['x-real-ip']: - ip = request.headers['x-real-ip'] - elif request.headers['x-forwarded-for']: - ip = request.headers['x-forwarded-for'] + if request.headers["x-real-ip"]: + ip = request.headers["x-real-ip"] + elif request.headers["x-forwarded-for"]: + ip = request.headers["x-forwarded-for"] - agent = request.headers['user-agent'] if 'user-agent' in request.headers else '' + agent = request.headers["user-agent"] if "user-agent" in request.headers else "" await create_hit(card.id, ip, agent, card.counter, ctr_int) link = await get_withdraw_link(card.withdraw, 0) return link.lnurl_response(request) + # /boltcards/api/v1/scane/?e=00000000000000000000000000000000&c=0000000000000000 @boltcards_ext.get("/api/v1/scane/") -async def api_scane( - e, c, - request: Request -): +async def api_scane(e, c, request: Request): card = None - counter = b'' + counter = b"" # since this route is common to all cards I don't know whitch 'meta key' to use # so I try one by one until decrypted uid matches for cand in await get_all_cards(): if cand.meta_key: - card_uid, counter = decryptSUN(bytes.fromhex(e), bytes.fromhex(cand.meta_key)) + card_uid, counter = decryptSUN( + bytes.fromhex(e), bytes.fromhex(cand.meta_key) + ) if card_uid.hex().upper() == cand.uid: card = cand @@ -187,17 +180,17 @@ async def api_scane( ctr_int = int.from_bytes(counter, "little") if ctr_int <= card.counter: return {"status": "ERROR", "reason": "This link is already used."} - + await update_card_counter(ctr_int, card.id) # gathering some info for hit record ip = request.client.host - if 'x-real-ip' in request.headers: - ip = request.headers['x-real-ip'] - elif 'x-forwarded-for' in request.headers: - ip = request.headers['x-forwarded-for'] + if "x-real-ip" in request.headers: + ip = request.headers["x-real-ip"] + elif "x-forwarded-for" in request.headers: + ip = request.headers["x-forwarded-for"] - agent = request.headers['user-agent'] if 'user-agent' in request.headers else '' + agent = request.headers["user-agent"] if "user-agent" in request.headers else "" await create_hit(card.id, ip, agent, card.counter, ctr_int)