SSE endpoint for paid invoices.

also move very essential stuff from core/tasks.py to tasks.py so things
are more organized.
This commit is contained in:
fiatjaf 2020-10-06 00:39:54 -03:00
parent 4e68a0e7e6
commit 95e8573ff8
9 changed files with 167 additions and 93 deletions

View File

@ -12,6 +12,7 @@ from .core import core_app
from .db import open_db, open_ext_db
from .helpers import get_valid_extensions, get_js_vendored, get_css_vendored, url_for_vendored
from .proxy_fix import ASGIProxyFix
from .tasks import invoice_listener, webhook_handler, grab_app_for_later
secure_headers = SecureHeaders(hsts=False)
@ -33,6 +34,7 @@ def create_app(config_object="lnbits.settings") -> QuartTrio:
register_commands(app)
register_request_hooks(app)
register_async_tasks(app)
grab_app_for_later(app)
return app
@ -52,7 +54,7 @@ def register_blueprints(app: QuartTrio) -> None:
@bp.teardown_request
async def after_request(exc):
g.ext_db.__exit__(type(exc), exc, None)
g.ext_db.close()
app.register_blueprint(bp, url_prefix=f"/{ext.code}")
except Exception:
@ -90,6 +92,7 @@ def register_request_hooks(app: QuartTrio):
@app.before_request
async def before_request():
g.db = open_db()
g.nursery = app.nursery
@app.after_request
async def set_secure_headers(response):
@ -98,12 +101,10 @@ def register_request_hooks(app: QuartTrio):
@app.teardown_request
async def after_request(exc):
g.db.__exit__(type(exc), exc, None)
g.db.close()
def register_async_tasks(app):
from lnbits.core.tasks import invoice_listener, webhook_handler
@app.route("/wallet/webhook", methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
async def webhook_listener():
return await webhook_handler()
@ -111,7 +112,7 @@ def register_async_tasks(app):
@app.before_serving
async def listeners():
app.nursery.start_soon(invoice_listener)
print("started invoice_listener")
print("started global invoice_listener.")
@app.after_serving
async def stop_listeners():

View File

@ -8,6 +8,8 @@ core_app: Blueprint = Blueprint(
from .views.api import * # noqa
from .views.generic import * # noqa
from .tasks import grab_app_for_later
from .tasks import on_invoice_paid
core_app.record(grab_app_for_later)
from lnbits.tasks import register_invoice_listener
register_invoice_listener("core", on_invoice_paid)

View File

@ -1,78 +1,15 @@
import trio # type: ignore
from http import HTTPStatus
from typing import Optional, Tuple, List, Callable, Awaitable
from quart import Request, g
from quart_trio import QuartTrio
from werkzeug.datastructures import Headers
from lnbits.db import open_db, open_ext_db
from lnbits.settings import WALLET
from typing import List
from .models import Payment
from .crud import get_standalone_payment
main_app: Optional[QuartTrio] = None
sse_listeners: List[trio.MemorySendChannel] = []
def grab_app_for_later(state):
global main_app
main_app = state.app
async def send_push_promise(a, b) -> None:
pass
async def run_on_pseudo_request(func: Callable, *args):
fk = Request(
"GET",
"http",
"/background/pseudo",
b"",
Headers([("host", "lnbits.background")]),
"",
"1.1",
send_push_promise=send_push_promise,
)
assert main_app
async def run():
async with main_app.request_context(fk):
with open_db() as g.db: # type: ignore
await func(*args)
async with trio.open_nursery() as nursery:
nursery.start_soon(run)
invoice_listeners: List[Tuple[str, Callable[[Payment], Awaitable[None]]]] = []
def register_invoice_listener(ext_name: str, cb: Callable[[Payment], Awaitable[None]]):
"""
A method intended for extensions to call when they want to be notified about
new invoice payments incoming.
"""
print(f"registering {ext_name} invoice_listener callback: {cb}")
invoice_listeners.append((ext_name, cb))
async def webhook_handler():
handler = getattr(WALLET, "webhook_listener", None)
if handler:
return await handler()
return "", HTTPStatus.NO_CONTENT
async def invoice_listener():
async for checking_id in WALLET.paid_invoices_stream():
await run_on_pseudo_request(invoice_callback_dispatcher, checking_id)
async def invoice_callback_dispatcher(checking_id: str):
payment = get_standalone_payment(checking_id)
if payment and payment.is_in:
payment.set_pending(False)
for ext_name, cb in invoice_listeners:
with open_ext_db(ext_name) as g.ext_db: # type: ignore
await cb(payment)
async def on_invoice_paid(payment: Payment):
for send_channel in sse_listeners:
try:
send_channel.send_nowait(payment)
except trio.WouldBlock:
print("removing sse listener", send_channel)
sse_listeners.remove(send_channel)

View File

@ -1,25 +1,23 @@
from quart import g, jsonify, request
import trio # type: ignore
import json
from quart import g, jsonify, request, make_response
from http import HTTPStatus
from binascii import unhexlify
from lnbits import bolt11
from lnbits.core import core_app
from lnbits.core.services import create_invoice, pay_invoice
from lnbits.core.crud import delete_expired_invoices
from lnbits.decorators import api_check_wallet_key, api_validate_post_request
from .. import core_app
from ..services import create_invoice, pay_invoice
from ..crud import delete_expired_invoices
from ..tasks import sse_listeners
@core_app.route("/api/v1/wallet", methods=["GET"])
@api_check_wallet_key("invoice")
async def api_wallet():
return (
jsonify(
{
"id": g.wallet.id,
"name": g.wallet.name,
"balance": g.wallet.balance_msat,
}
),
jsonify({"id": g.wallet.id, "name": g.wallet.name, "balance": g.wallet.balance_msat,}),
HTTPStatus.OK,
)
@ -124,3 +122,56 @@ async def api_payment(payment_hash):
return jsonify({"paid": False}), HTTPStatus.OK
return jsonify({"paid": not payment.pending}), HTTPStatus.OK
@core_app.route("/api/v1/payments/sse", methods=["GET"])
@api_check_wallet_key("invoice")
async def api_payments_sse():
g.db.close()
send_payment, receive_payment = trio.open_memory_channel(0)
print("adding sse listener", send_payment)
sse_listeners.append(send_payment)
send_event, receive_event = trio.open_memory_channel(0)
async def payment_received() -> None:
async for payment in receive_payment:
await send_event.send(("payment", payment))
async def repeat_keepalive():
await trio.sleep(1)
while True:
await send_event.send(("keepalive", ""))
await trio.sleep(25)
g.nursery.start_soon(payment_received)
g.nursery.start_soon(repeat_keepalive)
async def send_events():
try:
async for typ, data in receive_event:
message = [f"event: {typ}".encode("utf-8")]
if data:
jdata = json.dumps(data)
message.append(f"data: {jdata}".encode("utf-8"))
yield b"\n".join(message) + b"\r\n\r\n"
except trio.Cancelled:
print("canceled!")
return
response = await make_response(
send_events(),
{
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
},
)
response.timeout = None
return response

View File

@ -8,6 +8,7 @@ from lnurl import LnurlResponse, LnurlWithdrawResponse, decode as decode_lnurl
from lnbits.core import core_app
from lnbits.decorators import check_user_exists, validate_uuids
from lnbits.settings import LNBITS_ALLOWED_USERS, SERVICE_FEE
from lnbits.tasks import run_on_pseudo_request
from ..crud import (
create_account,
@ -17,7 +18,6 @@ from ..crud import (
delete_wallet,
)
from ..services import redeem_lnurl_withdraw
from ..tasks import run_on_pseudo_request
@core_app.route("/favicon.ico")

View File

@ -12,6 +12,9 @@ class Database:
self.cursor = self.connection.cursor()
self.closed = False
def close(self):
self.__exit__(None, None, None)
def __enter__(self):
return self

View File

@ -9,6 +9,6 @@ from .views import * # noqa
from .lnurl import * # noqa
from .tasks import on_invoice_paid
from lnbits.core.tasks import register_invoice_listener
from lnbits.tasks import register_invoice_listener
register_invoice_listener("lnurlp", on_invoice_paid)

View File

@ -6,7 +6,6 @@ from .crud import get_pay_link_by_invoice, mark_webhook_sent
async def on_invoice_paid(payment: Payment) -> None:
print(payment)
islnurlp = "lnurlp" == payment.extra.get("tag")
if islnurlp:
pay_link = get_pay_link_by_invoice(payment.payment_hash)

81
lnbits/tasks.py Normal file
View File

@ -0,0 +1,81 @@
import trio # type: ignore
from http import HTTPStatus
from typing import Optional, Tuple, List, Callable, Awaitable
from quart import Request, g
from quart_trio import QuartTrio
from werkzeug.datastructures import Headers
from lnbits.db import open_db, open_ext_db
from lnbits.settings import WALLET
from lnbits.core.models import Payment
from lnbits.core.crud import get_standalone_payment
main_app: Optional[QuartTrio] = None
def grab_app_for_later(app: QuartTrio):
global main_app
main_app = app
async def send_push_promise(a, b) -> None:
pass
async def run_on_pseudo_request(func: Callable, *args):
fk = Request(
"GET",
"http",
"/background/pseudo",
b"",
Headers([("host", "lnbits.background")]),
"",
"1.1",
send_push_promise=send_push_promise,
)
assert main_app
async def run():
async with main_app.request_context(fk):
with open_db() as g.db: # type: ignore
await func(*args)
async with trio.open_nursery() as nursery:
nursery.start_soon(run)
invoice_listeners: List[Tuple[str, Callable[[Payment], Awaitable[None]]]] = []
def register_invoice_listener(ext_name: str, cb: Callable[[Payment], Awaitable[None]]):
"""
A method intended for extensions to call when they want to be notified about
new invoice payments incoming.
"""
print(f"registering {ext_name} invoice_listener callback: {cb}")
invoice_listeners.append((ext_name, cb))
async def webhook_handler():
handler = getattr(WALLET, "webhook_listener", None)
if handler:
return await handler()
return "", HTTPStatus.NO_CONTENT
async def invoice_listener():
async for checking_id in WALLET.paid_invoices_stream():
await run_on_pseudo_request(invoice_callback_dispatcher, checking_id)
async def invoice_callback_dispatcher(checking_id: str):
payment = get_standalone_payment(checking_id)
if payment and payment.is_in:
payment.set_pending(False)
for ext_name, cb in invoice_listeners:
if ext_name == "core":
await cb(payment)
else:
with open_ext_db(ext_name) as g.ext_db: # type: ignore
await cb(payment)