black & isort

This commit is contained in:
Gene Takavic 2022-07-15 16:43:06 +02:00 committed by Lee Salminen
parent 5af49e3801
commit 5b8d317441
6 changed files with 88 additions and 86 deletions

View File

@ -5,10 +5,7 @@ from lnbits.helpers import template_renderer
db = Database("ext_boltcards")
boltcards_ext: APIRouter = APIRouter(
prefix="/boltcards",
tags=["boltcards"]
)
boltcards_ext: APIRouter = APIRouter(prefix="/boltcards", tags=["boltcards"])
def boltcards_renderer():

View File

@ -1,13 +1,13 @@
from optparse import Option
from typing import List, Optional, Union
from lnbits.helpers import urlsafe_short_hash
from . import db
from .models import Card, CreateCardData, Hit
async def create_card(
data: CreateCardData, wallet_id: str
) -> Card:
async def create_card(data: CreateCardData, wallet_id: str) -> Card:
card_id = urlsafe_short_hash()
await db.execute(
"""
@ -38,6 +38,7 @@ async def create_card(
assert card, "Newly created card couldn't be retrieved"
return card
async def update_card(card_id: str, **kwargs) -> Optional[Card]:
if "is_unique" in kwargs:
kwargs["is_unique"] = int(kwargs["is_unique"])
@ -46,11 +47,10 @@ async def update_card(card_id: str, **kwargs) -> Optional[Card]:
f"UPDATE boltcards.cards SET {q} WHERE id = ?",
(*kwargs.values(), card_id),
)
row = await db.fetchone(
"SELECT * FROM boltcards.cards WHERE id = ?", (card_id,)
)
row = await db.fetchone("SELECT * FROM boltcards.cards WHERE id = ?", (card_id,))
return Card(**row) if row else None
async def get_cards(wallet_ids: Union[str, List[str]]) -> List[Card]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
@ -62,17 +62,20 @@ async def get_cards(wallet_ids: Union[str, List[str]]) -> List[Card]:
return [Card(**row) for row in rows]
async def get_all_cards() -> List[Card]:
rows = await db.fetchall(
f"SELECT * FROM boltcards.cards"
)
rows = await db.fetchall(f"SELECT * FROM boltcards.cards")
return [Card(**row) for row in rows]
async def get_card(card_id: str, id_is_uid: bool=False) -> Optional[Card]:
sql = "SELECT * FROM boltcards.cards WHERE {} = ?".format("uid" if id_is_uid else "id")
async def get_card(card_id: str, id_is_uid: bool = False) -> Optional[Card]:
sql = "SELECT * FROM boltcards.cards WHERE {} = ?".format(
"uid" if id_is_uid else "id"
)
row = await db.fetchone(
sql, card_id,
sql,
card_id,
)
if not row:
return None
@ -81,19 +84,20 @@ async def get_card(card_id: str, id_is_uid: bool=False) -> Optional[Card]:
return Card.parse_obj(card)
async def delete_card(card_id: str) -> None:
await db.execute("DELETE FROM boltcards.cards WHERE id = ?", (card_id,))
async def update_card_counter(counter: int, id: str):
await db.execute(
"UPDATE boltcards.cards SET counter = ? WHERE id = ?",
(counter, id),
)
async def get_hit(hit_id: str) -> Optional[Hit]:
row = await db.fetchone(
f"SELECT * FROM boltcards.hits WHERE id = ?", (hit_id)
)
row = await db.fetchone(f"SELECT * FROM boltcards.hits WHERE id = ?", (hit_id))
if not row:
return None
@ -101,6 +105,7 @@ async def get_hit(hit_id: str) -> Optional[Hit]:
return Hit.parse_obj(hit)
async def get_hits(cards_ids: Union[str, List[str]]) -> List[Hit]:
q = ",".join(["?"] * len(cards_ids))
rows = await db.fetchall(
@ -109,9 +114,8 @@ async def get_hits(cards_ids: Union[str, List[str]]) -> List[Hit]:
return [Hit(**row) for row in rows]
async def create_hit(
card_id, ip, useragent, old_ctr, new_ctr
) -> Hit:
async def create_hit(card_id, ip, useragent, old_ctr, new_ctr) -> Hit:
hit_id = urlsafe_short_hash()
await db.execute(
"""

View File

@ -1,5 +1,6 @@
from lnbits.helpers import urlsafe_short_hash
async def m001_initial(db):
await db.execute(
"""

View File

@ -1,5 +1,6 @@
from pydantic import BaseModel
from fastapi.params import Query
from pydantic import BaseModel
class Card(BaseModel):
id: str
@ -12,6 +13,7 @@ class Card(BaseModel):
meta_key: str
time: int
class CreateCardData(BaseModel):
card_name: str = Query(...)
uid: str = Query(...)
@ -20,6 +22,7 @@ class CreateCardData(BaseModel):
file_key: str = Query(...)
meta_key: str = Query(...)
class Hit(BaseModel):
id: str
card_id: str

View File

@ -1,18 +1,21 @@
# https://www.nxp.com/docs/en/application-note/AN12196.pdf
from typing import Tuple
from Cryptodome.Hash import CMAC
from Cryptodome.Cipher import AES
from Cryptodome.Hash import CMAC
SV2 = "3CC300010080"
def myCMAC(key: bytes, msg: bytes=b'') -> bytes:
def myCMAC(key: bytes, msg: bytes = b"") -> bytes:
cobj = CMAC.new(key, ciphermod=AES)
if msg != b'':
if msg != b"":
cobj.update(msg)
return cobj.digest()
def decryptSUN(sun: bytes, key: bytes) -> Tuple[bytes, bytes]:
IVbytes = b"\x00" * 16
IVbytes = b"\x00" * 16
cipher = AES.new(key, AES.MODE_CBC, IVbytes)
sun_plain = cipher.decrypt(sun)
@ -22,6 +25,7 @@ def decryptSUN(sun: bytes, key: bytes) -> Tuple[bytes, bytes]:
return UID, counter
def getSunMAC(UID: bytes, counter: bytes, key: bytes) -> bytes:
sv2prefix = bytes.fromhex(SV2)
sv2bytes = sv2prefix + UID + counter

View File

@ -17,19 +17,20 @@ from lnbits.decorators import WalletTypeInfo, get_key_type, require_admin_key
from lnbits.extensions.withdraw import get_withdraw_link
from . import boltcards_ext
from .nxp424 import decryptSUN, getSunMAC
from .crud import (
create_hit,
get_all_cards,
get_cards,
get_card,
create_card,
create_hit,
delete_card,
get_all_cards,
get_card,
get_cards,
get_hits,
update_card,
delete_card,
update_card_counter
update_card_counter,
)
from .models import CreateCardData
from .nxp424 import decryptSUN, getSunMAC
@boltcards_ext.get("/api/v1/cards")
async def api_cards(
@ -42,32 +43,15 @@ async def api_cards(
return [card.dict() for card in await get_cards(wallet_ids)]
@boltcards_ext.post("/api/v1/cards", status_code=HTTPStatus.CREATED)
@boltcards_ext.put("/api/v1/cards/{card_id}", status_code=HTTPStatus.OK)
async def api_link_create_or_update(
# req: Request,
# req: Request,
data: CreateCardData,
card_id: str = None,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
'''
TODO: some checks
if data.uses > 250:
raise HTTPException(
detail="250 uses max.", status_code=HTTPStatus.BAD_REQUEST
)
if data.min_withdrawable < 1:
raise HTTPException(
detail="Min must be more than 1.", status_code=HTTPStatus.BAD_REQUEST
)
if data.max_withdrawable < data.min_withdrawable:
raise HTTPException(
detail="`max_withdrawable` needs to be at least `min_withdrawable`.",
status_code=HTTPStatus.BAD_REQUEST,
)
'''
if card_id:
card = await get_card(card_id)
if not card:
@ -78,15 +62,12 @@ async def api_link_create_or_update(
raise HTTPException(
detail="Not your card.", status_code=HTTPStatus.FORBIDDEN
)
card = await update_card(
card_id, **data.dict()
)
card = await update_card(card_id, **data.dict())
else:
card = await create_card(
wallet_id=wallet.wallet.id, data=data
)
card = await create_card(wallet_id=wallet.wallet.id, data=data)
return card.dict()
@boltcards_ext.delete("/api/v1/cards/{card_id}")
async def api_link_delete(card_id, wallet: WalletTypeInfo = Depends(require_admin_key)):
card = await get_card(card_id)
@ -97,13 +78,12 @@ async def api_link_delete(card_id, wallet: WalletTypeInfo = Depends(require_admi
)
if card.wallet != wallet.wallet.id:
raise HTTPException(
detail="Not your card.", status_code=HTTPStatus.FORBIDDEN
)
raise HTTPException(detail="Not your card.", status_code=HTTPStatus.FORBIDDEN)
await delete_card(card_id)
raise HTTPException(status_code=HTTPStatus.NO_CONTENT)
@boltcards_ext.get("/api/v1/hits")
async def api_hits(
g: WalletTypeInfo = Depends(get_key_type), all_wallets: bool = Query(False)
@ -120,20 +100,33 @@ async def api_hits(
return [hit.dict() for hit in await get_hits(cards_ids)]
# /boltcards/api/v1/scan/?uid=00000000000000&ctr=000000&c=0000000000000000
@boltcards_ext.get("/api/v1/scan/")
async def api_scan(
uid, ctr, c,
request: Request
):
@boltcards_ext.get("/api/v1/scan/")
async def api_scan(uid, ctr, c, request: Request):
card = await get_card(uid, id_is_uid=True)
if card == None:
return {"status": "ERROR", "reason": "Unknown card."}
if c != getSunMAC(bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key)).hex().upper():
if (
c
!= getSunMAC(
bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key)
)
.hex()
.upper()
):
print(c)
print(getSunMAC(bytes.fromhex(uid), bytes.fromhex(ctr)[::-1], bytes.fromhex(card.file_key)).hex().upper())
print(
getSunMAC(
bytes.fromhex(uid),
bytes.fromhex(ctr)[::-1],
bytes.fromhex(card.file_key),
)
.hex()
.upper()
)
return {"status": "ERROR", "reason": "CMAC does not check."}
ctr_int = int(ctr, 16)
@ -145,32 +138,32 @@ async def api_scan(
# gathering some info for hit record
ip = request.client.host
if request.headers['x-real-ip']:
ip = request.headers['x-real-ip']
elif request.headers['x-forwarded-for']:
ip = request.headers['x-forwarded-for']
if request.headers["x-real-ip"]:
ip = request.headers["x-real-ip"]
elif request.headers["x-forwarded-for"]:
ip = request.headers["x-forwarded-for"]
agent = request.headers['user-agent'] if 'user-agent' in request.headers else ''
agent = request.headers["user-agent"] if "user-agent" in request.headers else ""
await create_hit(card.id, ip, agent, card.counter, ctr_int)
link = await get_withdraw_link(card.withdraw, 0)
return link.lnurl_response(request)
# /boltcards/api/v1/scane/?e=00000000000000000000000000000000&c=0000000000000000
@boltcards_ext.get("/api/v1/scane/")
async def api_scane(
e, c,
request: Request
):
async def api_scane(e, c, request: Request):
card = None
counter = b''
counter = b""
# since this route is common to all cards I don't know whitch 'meta key' to use
# so I try one by one until decrypted uid matches
for cand in await get_all_cards():
if cand.meta_key:
card_uid, counter = decryptSUN(bytes.fromhex(e), bytes.fromhex(cand.meta_key))
card_uid, counter = decryptSUN(
bytes.fromhex(e), bytes.fromhex(cand.meta_key)
)
if card_uid.hex().upper() == cand.uid:
card = cand
@ -187,17 +180,17 @@ async def api_scane(
ctr_int = int.from_bytes(counter, "little")
if ctr_int <= card.counter:
return {"status": "ERROR", "reason": "This link is already used."}
await update_card_counter(ctr_int, card.id)
# gathering some info for hit record
ip = request.client.host
if 'x-real-ip' in request.headers:
ip = request.headers['x-real-ip']
elif 'x-forwarded-for' in request.headers:
ip = request.headers['x-forwarded-for']
if "x-real-ip" in request.headers:
ip = request.headers["x-real-ip"]
elif "x-forwarded-for" in request.headers:
ip = request.headers["x-forwarded-for"]
agent = request.headers['user-agent'] if 'user-agent' in request.headers else ''
agent = request.headers["user-agent"] if "user-agent" in request.headers else ""
await create_hit(card.id, ip, agent, card.counter, ctr_int)