diff --git a/lnbits/app.py b/lnbits/app.py index f612c32c..51482538 100644 --- a/lnbits/app.py +++ b/lnbits/app.py @@ -34,7 +34,6 @@ from .tasks import ( check_pending_payments, internal_invoice_listener, invoice_listener, - run_deferred_async, webhook_handler, ) @@ -185,7 +184,7 @@ def register_async_tasks(app): loop.create_task(catch_everything_and_restart(invoice_listener)) loop.create_task(catch_everything_and_restart(internal_invoice_listener)) await register_task_listeners() - await run_deferred_async() + # await run_deferred_async() # calle: doesn't do anyting? @app.on_event("shutdown") async def stop_listeners(): diff --git a/lnbits/core/services.py b/lnbits/core/services.py index 961eb7b2..5d993b4c 100644 --- a/lnbits/core/services.py +++ b/lnbits/core/services.py @@ -186,9 +186,9 @@ async def pay_invoice( ) # notify receiver asynchronously - from lnbits.tasks import internal_invoice_queue + logger.debug(f"enqueuing internal invoice {internal_checking_id}") await internal_invoice_queue.put(internal_checking_id) else: logger.debug(f"backend: sending payment {temp_id}") diff --git a/lnbits/core/tasks.py b/lnbits/core/tasks.py index 07b8a893..b57e2625 100644 --- a/lnbits/core/tasks.py +++ b/lnbits/core/tasks.py @@ -1,30 +1,43 @@ import asyncio -from typing import List +from typing import Dict import httpx from loguru import logger -from lnbits.tasks import register_invoice_listener +from lnbits.helpers import get_current_extension_name +from lnbits.tasks import SseListenersDict, register_invoice_listener from . import db from .crud import get_balance_notify from .models import Payment -api_invoice_listeners: List[asyncio.Queue] = [] +api_invoice_listeners: Dict[str, asyncio.Queue] = SseListenersDict( + "api_invoice_listeners" +) async def register_task_listeners(): + """ + Registers an invoice listener queue for the core tasks. + Incoming payaments in this queue will eventually trigger the signals sent to all other extensions + and fulfill other core tasks such as dispatching webhooks. + """ invoice_paid_queue = asyncio.Queue(5) - register_invoice_listener(invoice_paid_queue) + # we register invoice_paid_queue to receive all incoming invoices + register_invoice_listener(invoice_paid_queue, "core/tasks.py") + # register a worker that will react to invoices asyncio.create_task(wait_for_paid_invoices(invoice_paid_queue)) async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): + """ + This worker dispatches events to all extensions, dispatches webhooks and balance notifys. + """ while True: payment = await invoice_paid_queue.get() - logger.debug("received invoice paid event") + logger.trace("received invoice paid event") # send information to sse channel - await dispatch_invoice_listener(payment) + await dispatch_api_invoice_listeners(payment) # dispatch webhook if payment.webhook and not payment.webhook_status: @@ -41,16 +54,23 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): pass -async def dispatch_invoice_listener(payment: Payment): - for send_channel in api_invoice_listeners: +async def dispatch_api_invoice_listeners(payment: Payment): + """ + Emits events to invoice listener subscribed from the API. + """ + for chan_name, send_channel in api_invoice_listeners.items(): try: + logger.debug(f"sending invoice paid event to {chan_name}") send_channel.put_nowait(payment) except asyncio.QueueFull: - logger.debug("removing sse listener", send_channel) - api_invoice_listeners.remove(send_channel) + logger.error(f"removing sse listener {send_channel}:{chan_name}") + api_invoice_listeners.pop(chan_name) async def dispatch_webhook(payment: Payment): + """ + Dispatches the webhook to the webhook url. + """ async with httpx.AsyncClient() as client: data = payment.dict() try: diff --git a/lnbits/core/views/api.py b/lnbits/core/views/api.py index 7a2bbbe6..c07df568 100644 --- a/lnbits/core/views/api.py +++ b/lnbits/core/views/api.py @@ -3,11 +3,13 @@ import binascii import hashlib import json import time +import uuid from http import HTTPStatus from io import BytesIO from typing import Dict, List, Optional, Tuple, Union from urllib.parse import ParseResult, parse_qs, urlencode, urlparse, urlunparse +import async_timeout import httpx import pyqrcode from fastapi import Depends, Header, Query, Request @@ -16,7 +18,7 @@ from fastapi.params import Body from loguru import logger from pydantic import BaseModel from pydantic.fields import Field -from sse_starlette.sse import EventSourceResponse +from sse_starlette.sse import EventSourceResponse, ServerSentEvent from starlette.responses import HTMLResponse, StreamingResponse from lnbits import bolt11, lnurl @@ -366,37 +368,48 @@ async def api_payments_pay_lnurl( } -async def subscribe(request: Request, wallet: Wallet): +async def subscribe_wallet_invoices(request: Request, wallet: Wallet): + """ + Subscribe to new invoices for a wallet. Can be wrapped in EventSourceResponse. + Listenes invoming payments for a wallet and yields jsons with payment details. + """ this_wallet_id = wallet.id payment_queue: asyncio.Queue[Payment] = asyncio.Queue(0) - logger.debug("adding sse listener", payment_queue) - api_invoice_listeners.append(payment_queue) + uid = f"{this_wallet_id}_{str(uuid.uuid4())[:8]}" + logger.debug(f"adding sse listener for wallet: {uid}") + api_invoice_listeners[uid] = payment_queue send_queue: asyncio.Queue[Tuple[str, Payment]] = asyncio.Queue(0) async def payment_received() -> None: while True: - payment: Payment = await payment_queue.get() - if payment.wallet_id == this_wallet_id: - logger.debug("payment received", payment) - await send_queue.put(("payment-received", payment)) + try: + async with async_timeout.timeout(1): + payment: Payment = await payment_queue.get() + if payment.wallet_id == this_wallet_id: + logger.debug("sse listener: payment receieved", payment) + await send_queue.put(("payment-received", payment)) + except asyncio.TimeoutError: + pass - asyncio.create_task(payment_received()) + task = asyncio.create_task(payment_received()) try: while True: + if await request.is_disconnected(): + await request.close() + break typ, data = await send_queue.get() - if data: jdata = json.dumps(dict(data.dict(), pending=False)) - # yield dict(id=1, event="this", data="1234") - # await asyncio.sleep(2) yield dict(data=jdata, event=typ) - # yield dict(data=jdata.encode("utf-8"), event=typ.encode("utf-8")) - except asyncio.CancelledError: + except asyncio.CancelledError as e: + logger.debug(f"CancelledError on listener {uid}: {e}") + api_invoice_listeners.pop(uid) + task.cancel() return @@ -405,7 +418,9 @@ async def api_payments_sse( request: Request, wallet: WalletTypeInfo = Depends(get_key_type) ): return EventSourceResponse( - subscribe(request, wallet.wallet), ping=20, media_type="text/event-stream" + subscribe_wallet_invoices(request, wallet.wallet), + ping=20, + media_type="text/event-stream", ) diff --git a/lnbits/core/views/public_api.py b/lnbits/core/views/public_api.py index 2d2cdd66..9b0ebc98 100644 --- a/lnbits/core/views/public_api.py +++ b/lnbits/core/views/public_api.py @@ -46,8 +46,8 @@ async def api_public_payment_longpolling(payment_hash): payment_queue = asyncio.Queue(0) - logger.debug("adding standalone invoice listener", payment_hash, payment_queue) - api_invoice_listeners.append(payment_queue) + logger.debug(f"adding standalone invoice listener for hash: {payment_hash}") + api_invoice_listeners[payment_hash] = payment_queue response = None diff --git a/lnbits/extensions/boltcards/tasks.py b/lnbits/extensions/boltcards/tasks.py index 1b51c98b..c1e99b76 100644 --- a/lnbits/extensions/boltcards/tasks.py +++ b/lnbits/extensions/boltcards/tasks.py @@ -5,6 +5,7 @@ import httpx from lnbits.core import db as core_db from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import create_refund, get_hit @@ -12,7 +13,7 @@ from .crud import create_refund, get_hit async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/boltz/boltz.py b/lnbits/extensions/boltz/boltz.py index 4e5fecd0..ac99d4f4 100644 --- a/lnbits/extensions/boltz/boltz.py +++ b/lnbits/extensions/boltz/boltz.py @@ -34,8 +34,8 @@ from .models import ( from .utils import check_balance, get_timestamp, req_wrap net = NETWORKS[BOLTZ_NETWORK] -logger.debug(f"BOLTZ_URL: {BOLTZ_URL}") -logger.debug(f"Bitcoin Network: {net['name']}") +logger.trace(f"BOLTZ_URL: {BOLTZ_URL}") +logger.trace(f"Bitcoin Network: {net['name']}") async def create_swap(data: CreateSubmarineSwap) -> SubmarineSwap: diff --git a/lnbits/extensions/boltz/mempool.py b/lnbits/extensions/boltz/mempool.py index ee305257..a44c0f02 100644 --- a/lnbits/extensions/boltz/mempool.py +++ b/lnbits/extensions/boltz/mempool.py @@ -11,8 +11,8 @@ from lnbits.settings import BOLTZ_MEMPOOL_SPACE_URL, BOLTZ_MEMPOOL_SPACE_URL_WS from .utils import req_wrap -logger.debug(f"BOLTZ_MEMPOOL_SPACE_URL: {BOLTZ_MEMPOOL_SPACE_URL}") -logger.debug(f"BOLTZ_MEMPOOL_SPACE_URL_WS: {BOLTZ_MEMPOOL_SPACE_URL_WS}") +logger.trace(f"BOLTZ_MEMPOOL_SPACE_URL: {BOLTZ_MEMPOOL_SPACE_URL}") +logger.trace(f"BOLTZ_MEMPOOL_SPACE_URL_WS: {BOLTZ_MEMPOOL_SPACE_URL_WS}") websocket_url = f"{BOLTZ_MEMPOOL_SPACE_URL_WS}/api/v1/ws" diff --git a/lnbits/extensions/boltz/tasks.py b/lnbits/extensions/boltz/tasks.py index d6f72edf..ace94557 100644 --- a/lnbits/extensions/boltz/tasks.py +++ b/lnbits/extensions/boltz/tasks.py @@ -5,6 +5,7 @@ from loguru import logger from lnbits.core.models import Payment from lnbits.core.services import check_transaction_status +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .boltz import ( @@ -127,7 +128,7 @@ async def check_for_pending_swaps(): async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/copilot/tasks.py b/lnbits/extensions/copilot/tasks.py index f3c5cff8..c59ef4cc 100644 --- a/lnbits/extensions/copilot/tasks.py +++ b/lnbits/extensions/copilot/tasks.py @@ -7,6 +7,7 @@ from starlette.exceptions import HTTPException from lnbits.core import db as core_db from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_copilot @@ -15,7 +16,7 @@ from .views import updater async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/jukebox/tasks.py b/lnbits/extensions/jukebox/tasks.py index 70a2e65d..5614d926 100644 --- a/lnbits/extensions/jukebox/tasks.py +++ b/lnbits/extensions/jukebox/tasks.py @@ -1,6 +1,7 @@ import asyncio from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import update_jukebox_payment @@ -8,7 +9,7 @@ from .crud import update_jukebox_payment async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/livestream/tasks.py b/lnbits/extensions/livestream/tasks.py index 85bdd5e0..626c698c 100644 --- a/lnbits/extensions/livestream/tasks.py +++ b/lnbits/extensions/livestream/tasks.py @@ -6,7 +6,7 @@ from loguru import logger from lnbits.core import db as core_db from lnbits.core.crud import create_payment from lnbits.core.models import Payment -from lnbits.helpers import urlsafe_short_hash +from lnbits.helpers import get_current_extension_name, urlsafe_short_hash from lnbits.tasks import internal_invoice_listener, register_invoice_listener from .crud import get_livestream_by_track, get_producer, get_track @@ -14,7 +14,7 @@ from .crud import get_livestream_by_track, get_producer, get_track async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/lnaddress/tasks.py b/lnbits/extensions/lnaddress/tasks.py index 9abe10c3..0c377eec 100644 --- a/lnbits/extensions/lnaddress/tasks.py +++ b/lnbits/extensions/lnaddress/tasks.py @@ -3,6 +3,7 @@ import asyncio import httpx from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_address, get_domain, set_address_paid, set_address_renewed @@ -10,7 +11,7 @@ from .crud import get_address, get_domain, set_address_paid, set_address_renewed async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/lnticket/__init__.py b/lnbits/extensions/lnticket/__init__.py index 792b1175..cb793f4d 100644 --- a/lnbits/extensions/lnticket/__init__.py +++ b/lnbits/extensions/lnticket/__init__.py @@ -1,4 +1,5 @@ import asyncio +import json from fastapi import APIRouter diff --git a/lnbits/extensions/lnticket/tasks.py b/lnbits/extensions/lnticket/tasks.py index 7e672115..746ebea9 100644 --- a/lnbits/extensions/lnticket/tasks.py +++ b/lnbits/extensions/lnticket/tasks.py @@ -3,6 +3,7 @@ import asyncio from loguru import logger from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_ticket, set_ticket_paid @@ -10,7 +11,7 @@ from .crud import get_ticket, set_ticket_paid async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/lnurlp/tasks.py b/lnbits/extensions/lnurlp/tasks.py index 525d36ce..86f1579a 100644 --- a/lnbits/extensions/lnurlp/tasks.py +++ b/lnbits/extensions/lnurlp/tasks.py @@ -5,6 +5,7 @@ import httpx from lnbits.core import db as core_db from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_pay_link @@ -12,7 +13,7 @@ from .crud import get_pay_link async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/lnurlpayout/tasks.py b/lnbits/extensions/lnurlpayout/tasks.py index b621876c..71f299be 100644 --- a/lnbits/extensions/lnurlpayout/tasks.py +++ b/lnbits/extensions/lnurlpayout/tasks.py @@ -10,6 +10,7 @@ from lnbits.core.crud import get_wallet from lnbits.core.models import Payment from lnbits.core.services import pay_invoice from lnbits.core.views.api import api_payments_decode +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_lnurlpayout_from_wallet @@ -17,7 +18,7 @@ from .crud import get_lnurlpayout_from_wallet async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/satspay/tasks.py b/lnbits/extensions/satspay/tasks.py index d325405b..46c16bbc 100644 --- a/lnbits/extensions/satspay/tasks.py +++ b/lnbits/extensions/satspay/tasks.py @@ -4,6 +4,7 @@ from loguru import logger from lnbits.core.models import Payment from lnbits.extensions.satspay.crud import check_address_balance, get_charge +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener # from .crud import get_ticket, set_ticket_paid @@ -11,7 +12,7 @@ from lnbits.tasks import register_invoice_listener async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/scrub/tasks.py b/lnbits/extensions/scrub/tasks.py index 87e1364b..320d34da 100644 --- a/lnbits/extensions/scrub/tasks.py +++ b/lnbits/extensions/scrub/tasks.py @@ -9,6 +9,7 @@ from fastapi import HTTPException from lnbits import bolt11 from lnbits.core.models import Payment from lnbits.core.services import pay_invoice +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .crud import get_scrub_by_wallet @@ -16,7 +17,7 @@ from .crud import get_scrub_by_wallet async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/splitpayments/tasks.py b/lnbits/extensions/splitpayments/tasks.py index 0948e849..b7cf1750 100644 --- a/lnbits/extensions/splitpayments/tasks.py +++ b/lnbits/extensions/splitpayments/tasks.py @@ -6,7 +6,7 @@ from loguru import logger from lnbits.core import db as core_db from lnbits.core.crud import create_payment from lnbits.core.models import Payment -from lnbits.helpers import urlsafe_short_hash +from lnbits.helpers import get_current_extension_name, urlsafe_short_hash from lnbits.tasks import internal_invoice_queue, register_invoice_listener from .crud import get_targets @@ -14,7 +14,7 @@ from .crud import get_targets async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/subdomains/tasks.py b/lnbits/extensions/subdomains/tasks.py index d8f35161..04ee2dd4 100644 --- a/lnbits/extensions/subdomains/tasks.py +++ b/lnbits/extensions/subdomains/tasks.py @@ -3,6 +3,7 @@ import asyncio import httpx from lnbits.core.models import Payment +from lnbits.helpers import get_current_extension_name from lnbits.tasks import register_invoice_listener from .cloudflare import cloudflare_create_subdomain @@ -11,7 +12,7 @@ from .crud import get_domain, set_subdomain_paid async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/extensions/tpos/tasks.py b/lnbits/extensions/tpos/tasks.py index af9663cc..f18d1689 100644 --- a/lnbits/extensions/tpos/tasks.py +++ b/lnbits/extensions/tpos/tasks.py @@ -4,7 +4,7 @@ import json from lnbits.core import db as core_db from lnbits.core.crud import create_payment from lnbits.core.models import Payment -from lnbits.helpers import urlsafe_short_hash +from lnbits.helpers import get_current_extension_name, urlsafe_short_hash from lnbits.tasks import internal_invoice_queue, register_invoice_listener from .crud import get_tpos @@ -12,7 +12,7 @@ from .crud import get_tpos async def wait_for_paid_invoices(): invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue) + register_invoice_listener(invoice_queue, get_current_extension_name()) while True: payment = await invoice_queue.get() diff --git a/lnbits/helpers.py b/lnbits/helpers.py index e97fc7bb..e213240c 100644 --- a/lnbits/helpers.py +++ b/lnbits/helpers.py @@ -183,3 +183,26 @@ def template_renderer(additional_folders: List = []) -> Jinja2Templates: t.env.globals["VENDORED_CSS"] = ["/static/bundle.css"] return t + + +def get_current_extension_name() -> str: + """ + Returns the name of the extension that calls this method. + """ + import inspect + import json + import os + + callee_filepath = inspect.stack()[1].filename + callee_dirname, callee_filename = os.path.split(callee_filepath) + + path = os.path.normpath(callee_dirname) + extension_director_name = path.split(os.sep)[-1] + try: + config_path = os.path.join(callee_dirname, "config.json") + with open(config_path) as json_file: + config = json.load(json_file) + ext_name = config["name"] + except: + ext_name = extension_director_name + return ext_name diff --git a/lnbits/tasks.py b/lnbits/tasks.py index 41287ff2..94e43dcf 100644 --- a/lnbits/tasks.py +++ b/lnbits/tasks.py @@ -1,8 +1,9 @@ import asyncio import time import traceback +import uuid from http import HTTPStatus -from typing import Callable, List +from typing import Callable, Dict, List from fastapi.exceptions import HTTPException from loguru import logger @@ -18,20 +19,6 @@ from lnbits.settings import WALLET from .core import db -deferred_async: List[Callable] = [] - - -def record_async(func: Callable) -> Callable: - def recorder(state): - deferred_async.append(func) - - return recorder - - -async def run_deferred_async(): - for func in deferred_async: - asyncio.create_task(catch_everything_and_restart(func)) - async def catch_everything_and_restart(func): try: @@ -50,18 +37,48 @@ async def send_push_promise(a, b) -> None: pass -invoice_listeners: List[asyncio.Queue] = [] +class SseListenersDict(dict): + """ + A dict of sse listeners. + """ + + def __init__(self, name: str = None): + self.name = name or f"sse_listener_{str(uuid.uuid4())[:8]}" + + def __setitem__(self, key, value): + assert type(key) == str, f"{key} is not a string" + assert type(value) == asyncio.Queue, f"{value} is not an asyncio.Queue" + logger.trace(f"sse: adding listener {key} to {self.name}. len = {len(self)+1}") + return super().__setitem__(key, value) + + def __delitem__(self, key): + logger.trace(f"sse: removing listener from {self.name}. len = {len(self)-1}") + return super().__delitem__(key) + + _RaiseKeyError = object() # singleton for no-default behavior + + def pop(self, key, v=_RaiseKeyError) -> None: + logger.trace(f"sse: removing listener from {self.name}. len = {len(self)-1}") + return super().pop(key) -def register_invoice_listener(send_chan: asyncio.Queue): +invoice_listeners: Dict[str, asyncio.Queue] = SseListenersDict("invoice_listeners") + + +def register_invoice_listener(send_chan: asyncio.Queue, name: str = None): """ - A method intended for extensions to call when they want to be notified about - new invoice payments incoming. + A method intended for extensions (and core/tasks.py) to call when they want to be notified about + new invoice payments incoming. Will emit all incoming payments. """ - invoice_listeners.append(send_chan) + name_unique = f"{name or 'no_name'}_{str(uuid.uuid4())[:8]}" + logger.trace(f"sse: registering invoice listener {name_unique}") + invoice_listeners[name_unique] = send_chan async def webhook_handler(): + """ + Returns the webhook_handler for the selected wallet if present. Used by API. + """ handler = getattr(WALLET, "webhook_listener", None) if handler: return await handler() @@ -72,18 +89,36 @@ internal_invoice_queue: asyncio.Queue = asyncio.Queue(0) async def internal_invoice_listener(): + """ + internal_invoice_queue will be filled directly in core/services.py + after the payment was deemed to be settled internally. + + Called by the app startup sequence. + """ while True: checking_id = await internal_invoice_queue.get() + logger.info("> got internal payment notification", checking_id) asyncio.create_task(invoice_callback_dispatcher(checking_id)) async def invoice_listener(): + """ + invoice_listener will collect all invoices that come directly + from the backend wallet. + + Called by the app startup sequence. + """ async for checking_id in WALLET.paid_invoices_stream(): logger.info("> got a payment notification", checking_id) asyncio.create_task(invoice_callback_dispatcher(checking_id)) async def check_pending_payments(): + """ + check_pending_payments is called during startup to check for pending payments with + the backend and also to delete expired invoices. Incoming payments will be + checked only once, outgoing pending payments will be checked regularly. + """ outgoing = True incoming = True @@ -133,9 +168,14 @@ async def perform_balance_checks(): async def invoice_callback_dispatcher(checking_id: str): + """ + Takes incoming payments, sets pending=False, and dispatches them to + invoice_listeners from core and extensions. + """ payment = await get_standalone_payment(checking_id, incoming=True) if payment and payment.is_in: - logger.trace("sending invoice callback for payment", checking_id) + logger.trace(f"sse sending invoice callback for payment {checking_id}") await payment.set_pending(False) - for send_chan in invoice_listeners: + for chan_name, send_chan in invoice_listeners.items(): + logger.trace(f"sse sending to chan: {chan_name}") await send_chan.put(payment) diff --git a/lnbits/wallets/fake.py b/lnbits/wallets/fake.py index 8424001b..a07ef4d8 100644 --- a/lnbits/wallets/fake.py +++ b/lnbits/wallets/fake.py @@ -8,9 +8,7 @@ from typing import AsyncGenerator, Dict, Optional from environs import Env # type: ignore from loguru import logger -from lnbits.helpers import urlsafe_short_hash - -from ..bolt11 import decode, encode +from ..bolt11 import Invoice, decode, encode from .base import ( InvoiceResponse, PaymentResponse, @@ -24,6 +22,16 @@ env.read_env() class FakeWallet(Wallet): + queue: asyncio.Queue = asyncio.Queue(0) + secret: str = env.str("FAKE_WALLET_SECTRET", default="ToTheMoon1") + privkey: str = hashlib.pbkdf2_hmac( + "sha256", + secret.encode("utf-8"), + ("FakeWallet").encode("utf-8"), + 2048, + 32, + ).hex() + async def status(self) -> StatusResponse: logger.info( "FakeWallet funding source is for using LNbits as a centralised, stand-alone payment system with brrrrrr." @@ -39,18 +47,12 @@ class FakeWallet(Wallet): ) -> InvoiceResponse: # we set a default secret since FakeWallet is used for internal=True invoices # and the user might not have configured a secret yet - secret = env.str("FAKE_WALLET_SECTRET", default="ToTheMoon1") + data: Dict = { "out": False, "amount": amount, "currency": "bc", - "privkey": hashlib.pbkdf2_hmac( - "sha256", - secret.encode("utf-8"), - ("FakeWallet").encode("utf-8"), - 2048, - 32, - ).hex(), + "privkey": self.privkey, "memo": None, "description_hash": None, "description": "", @@ -86,8 +88,9 @@ class FakeWallet(Wallet): invoice = decode(bolt11) if ( hasattr(invoice, "checking_id") - and invoice.checking_id[6:] == data["privkey"][:6] # type: ignore + and invoice.checking_id[:6] == self.privkey[:6] # type: ignore ): + await self.queue.put(invoice) return PaymentResponse(True, invoice.payment_hash, 0) else: return PaymentResponse( @@ -101,7 +104,6 @@ class FakeWallet(Wallet): return PaymentStatus(None) async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: - self.queue: asyncio.Queue = asyncio.Queue(0) while True: - value = await self.queue.get() - yield value + value: Invoice = await self.queue.get() + yield value.payment_hash diff --git a/poetry.lock b/poetry.lock index 2a57a5c1..343fffbf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -46,6 +46,17 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "async-timeout" +version = "4.0.2" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +typing-extensions = {version = ">=3.6.5", markers = "python_version < \"3.8\""} + [[package]] name = "attrs" version = "21.2.0" @@ -111,7 +122,7 @@ jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] [[package]] -name = "cerberus" +name = "Cerberus" version = "1.3.4" description = "Lightweight, extensible schema and data validation tool for Python dictionaries." category = "main" @@ -402,7 +413,7 @@ plugins = ["setuptools"] requirements_deprecated_finder = ["pip-api", "pipreqs"] [[package]] -name = "jinja2" +name = "Jinja2" version = "3.0.1" description = "A very fast and expressive template engine." category = "main" @@ -444,7 +455,7 @@ win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} dev = ["Sphinx (>=2.2.1)", "black (>=19.10b0)", "codecov (>=2.0.15)", "colorama (>=0.3.4)", "flake8 (>=3.7.7)", "isort (>=5.1.1)", "pytest (>=4.6.2)", "pytest-cov (>=2.7.1)", "sphinx-autobuild (>=0.7.1)", "sphinx-rtd-theme (>=0.4.3)", "tox (>=3.9.0)", "tox-travis (>=0.12)"] [[package]] -name = "markupsafe" +name = "MarkupSafe" version = "2.0.1" description = "Safely add untrusted strings to HTML/XML markup." category = "main" @@ -648,12 +659,12 @@ optional = false python-versions = ">=3.7,<4.0" [package.dependencies] -pyln-bolt7 = ">=1.0,<2.0" -pyln-proto = ">=0.11,<0.12" +pyln-bolt7 = ">=1.0" +pyln-proto = ">=0.12" [[package]] name = "pyln-proto" -version = "0.11.1" +version = "0.12.0" description = "This package implements some of the Lightning Network protocol in pure python. It is intended for protocol testing and some minor tooling only. It is not deemed secure enough to handle any amount of real funds (you have been warned!)." category = "main" optional = false @@ -686,7 +697,7 @@ optional = false python-versions = "*" [[package]] -name = "pyqrcode" +name = "PyQRCode" version = "1.2.1" description = "A QR code generator written purely in Python with SVG, EPS, PNG and terminal output." category = "main" @@ -697,7 +708,7 @@ python-versions = "*" PNG = ["pypng (>=0.0.13)"] [[package]] -name = "pyscss" +name = "pyScss" version = "1.4.0" description = "pyScss, a Scss compiler for Python" category = "main" @@ -780,7 +791,7 @@ python-versions = ">=3.5" cli = ["click (>=5.0)"] [[package]] -name = "pyyaml" +name = "PyYAML" version = "5.4.1" description = "YAML parser and emitter for Python" category = "main" @@ -788,7 +799,7 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [[package]] -name = "represent" +name = "Represent" version = "1.6.0.post0" description = "Create __repr__ automatically or declaratively." category = "main" @@ -864,7 +875,7 @@ optional = false python-versions = ">=3.5" [[package]] -name = "sqlalchemy" +name = "SQLAlchemy" version = "1.3.23" description = "Database Abstraction Library" category = "main" @@ -1059,6 +1070,10 @@ asn1crypto = [ {file = "asn1crypto-1.5.1-py2.py3-none-any.whl", hash = "sha256:db4e40728b728508912cbb3d44f19ce188f218e9eba635821bb4b68564f8fd67"}, {file = "asn1crypto-1.5.1.tar.gz", hash = "sha256:13ae38502be632115abf8a24cbe5f4da52e3b5231990aff31123c805306ccb9c"}, ] +async-timeout = [ + {file = "async-timeout-4.0.2.tar.gz", hash = "sha256:2163e1640ddb52b7a8c80d0a67a08587e5d245cc9c553a74a847056bc2976b15"}, + {file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"}, +] attrs = [ {file = "attrs-21.2.0-py2.py3-none-any.whl", hash = "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1"}, {file = "attrs-21.2.0.tar.gz", hash = "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb"}, @@ -1101,7 +1116,7 @@ black = [ {file = "black-22.8.0-py3-none-any.whl", hash = "sha256:d2c21d439b2baf7aa80d6dd4e3659259be64c6f49dfd0f32091063db0e006db4"}, {file = "black-22.8.0.tar.gz", hash = "sha256:792f7eb540ba9a17e8656538701d3eb1afcb134e3b45b71f20b25c77a8db7e6e"}, ] -cerberus = [ +Cerberus = [ {file = "Cerberus-1.3.4.tar.gz", hash = "sha256:d1b21b3954b2498d9a79edf16b3170a3ac1021df88d197dc2ce5928ba519237c"}, ] certifi = [ @@ -1413,7 +1428,7 @@ isort = [ {file = "isort-5.10.1-py3-none-any.whl", hash = "sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7"}, {file = "isort-5.10.1.tar.gz", hash = "sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951"}, ] -jinja2 = [ +Jinja2 = [ {file = "Jinja2-3.0.1-py3-none-any.whl", hash = "sha256:1f06f2da51e7b56b8f238affdd6b4e2c61e39598a378cc49345bc1bd42a978a4"}, {file = "Jinja2-3.0.1.tar.gz", hash = "sha256:703f484b47a6af502e743c9122595cc812b0271f661722403114f71a79d0f5a4"}, ] @@ -1425,7 +1440,7 @@ loguru = [ {file = "loguru-0.5.3-py3-none-any.whl", hash = "sha256:f8087ac396b5ee5f67c963b495d615ebbceac2796379599820e324419d53667c"}, {file = "loguru-0.5.3.tar.gz", hash = "sha256:b28e72ac7a98be3d28ad28570299a393dfcd32e5e3f6a353dec94675767b6319"}, ] -markupsafe = [ +MarkupSafe = [ {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d8446c54dc28c01e5a2dbac5a25f071f6653e6e40f3a8818e8b45d790fe6ef53"}, {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:36bc903cbb393720fad60fc28c10de6acf10dc6cc883f3e24ee4012371399a38"}, {file = "MarkupSafe-2.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2d7d807855b419fc2ed3e631034685db6079889a1f01d5d9dac950f764da3dad"}, @@ -1681,8 +1696,8 @@ pyln-client = [ {file = "pyln_client-0.11.1-py3-none-any.whl", hash = "sha256:497db443406b80c98c0434e2938eb1b2a17e88fd9aa63b018124068198df6141"}, ] pyln-proto = [ - {file = "pyln-proto-0.11.1.tar.gz", hash = "sha256:9bed240f41917c4fd526b767218a77d0fbe69242876eef72c35a856796f922d6"}, - {file = "pyln_proto-0.11.1-py3-none-any.whl", hash = "sha256:27b2e04a81b894f69018279c0ce4aa2e7ccd03b86dd9783f96b9d8d1498c8393"}, + {file = "pyln-proto-0.12.0.tar.gz", hash = "sha256:3214d99d8385f2135a94937f0dc1da626a33b257e9ebc320841656edaefabbe5"}, + {file = "pyln_proto-0.12.0-py3-none-any.whl", hash = "sha256:dedef5d8e476a9ade5a0b2eb919ccc37e4a57f2a78fdc399f1c5e0de17e41604"}, ] pyparsing = [ {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, @@ -1691,11 +1706,11 @@ pyparsing = [ pypng = [ {file = "pypng-0.0.21-py3-none-any.whl", hash = "sha256:76f8a1539ec56451da7ab7121f12a361969fe0f2d48d703d198ce2a99d6c5afd"}, ] -pyqrcode = [ +PyQRCode = [ {file = "PyQRCode-1.2.1.tar.gz", hash = "sha256:fdbf7634733e56b72e27f9bce46e4550b75a3a2c420414035cae9d9d26b234d5"}, {file = "PyQRCode-1.2.1.zip", hash = "sha256:1b2812775fa6ff5c527977c4cd2ccb07051ca7d0bc0aecf937a43864abe5eff6"}, ] -pyscss = [ +pyScss = [ {file = "pyScss-1.4.0.tar.gz", hash = "sha256:8f35521ffe36afa8b34c7d6f3195088a7057c185c2b8f15ee459ab19748669ff"}, ] PySocks = [ @@ -1719,7 +1734,7 @@ python-dotenv = [ {file = "python-dotenv-0.19.0.tar.gz", hash = "sha256:f521bc2ac9a8e03c736f62911605c5d83970021e3fa95b37d769e2bbbe9b6172"}, {file = "python_dotenv-0.19.0-py2.py3-none-any.whl", hash = "sha256:aae25dc1ebe97c420f50b81fb0e5c949659af713f31fdb63c749ca68748f34b1"}, ] -pyyaml = [ +PyYAML = [ {file = "PyYAML-5.4.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3b2b1824fe7112845700f815ff6a489360226a5609b96ec2190a45e62a9fc922"}, {file = "PyYAML-5.4.1-cp27-cp27m-win32.whl", hash = "sha256:129def1b7c1bf22faffd67b8f3724645203b79d8f4cc81f674654d9902cb4393"}, {file = "PyYAML-5.4.1-cp27-cp27m-win_amd64.whl", hash = "sha256:4465124ef1b18d9ace298060f4eccc64b0850899ac4ac53294547536533800c8"}, @@ -1750,7 +1765,7 @@ pyyaml = [ {file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"}, {file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"}, ] -represent = [ +Represent = [ {file = "Represent-1.6.0.post0-py2.py3-none-any.whl", hash = "sha256:99142650756ef1998ce0661568f54a47dac8c638fb27e3816c02536575dbba8c"}, {file = "Represent-1.6.0.post0.tar.gz", hash = "sha256:026c0de2ee8385d1255b9c2426cd4f03fe9177ac94c09979bc601946c8493aa0"}, ] @@ -1799,7 +1814,7 @@ sniffio = [ {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"}, {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"}, ] -sqlalchemy = [ +SQLAlchemy = [ {file = "SQLAlchemy-1.3.23-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:fd3b96f8c705af8e938eaa99cbd8fd1450f632d38cad55e7367c33b263bf98ec"}, {file = "SQLAlchemy-1.3.23-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:29cccc9606750fe10c5d0e8bd847f17a97f3850b8682aef1f56f5d5e1a5a64b1"}, {file = "SQLAlchemy-1.3.23-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:927ce09e49bff3104459e1451ce82983b0a3062437a07d883a4c66f0b344c9b5"}, diff --git a/pyproject.toml b/pyproject.toml index 864500f7..19dac860 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,6 @@ asgiref = "3.4.1" attrs = "21.2.0" bech32 = "1.2.0" bitstring = "3.1.9" -cerberus = "1.3.4" certifi = "2021.5.30" charset-normalizer = "2.0.6" click = "8.0.1" @@ -62,6 +61,8 @@ cffi = "1.15.0" websocket-client = "1.3.3" grpcio = "^1.49.1" protobuf = "^4.21.6" +Cerberus = "^1.3.4" +async-timeout = "^4.0.2" pyln-client = "0.11.1" [tool.poetry.dev-dependencies] diff --git a/requirements.txt b/requirements.txt index 697ea1d4..eb9a6e5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,3 +51,5 @@ uvloop==0.16.0 watchfiles==0.16.0 websockets==10.3 websocket-client==1.3.3 +async-timeout==4.0.2 +setuptools==65.4.0 \ No newline at end of file