migrate to trio so c-lightning sockets stop hanging.

This commit is contained in:
fiatjaf 2020-10-03 21:57:14 -03:00
parent e74cf33f90
commit 9994e61615
15 changed files with 185 additions and 79 deletions

View File

@ -21,10 +21,12 @@ quart-compress = "*"
secure = "*"
typing-extensions = "*"
httpx = "*"
quart-trio = "*"
trio = "*"
[dev-packages]
black = "==20.8b1"
pytest = "*"
pytest-cov = "*"
pytest-asyncio = "*"
mypy = "==0.761"
pytest-trio = "*"

127
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "6f7d14aa2e3bc6a1319c7f0e2873151cefa741792fccc249567932a3a94263e3"
"sha256": "894690d75d6558f0aa98eed8c5f54bdfe79c2a1bfd736507f930bf07c775a89e"
},
"pipfile-spec": 6,
"requires": {
@ -23,6 +23,22 @@
],
"version": "==0.5.0"
},
"async-generator": {
"hashes": [
"sha256:01c7bf666359b4967d2cda0000cc2e4af16a0ae098cbffcb8472fb9e8ad6585b",
"sha256:6ebb3d106c12920aaae42ccb6f787ef5eefdcdd166ea3d628fa8476abe712144"
],
"markers": "python_version >= '3.5'",
"version": "==1.10"
},
"attrs": {
"hashes": [
"sha256:26b54ddbbb9ee1d34d5d3668dd37d6cf74990ab23c828c2888dccdceee395594",
"sha256:fce7fc47dfc976152e82d53ff92fa0407700c21acd20886a13777a0d20e655dc"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==20.2.0"
},
"bech32": {
"hashes": [
"sha256:7d6db8214603bd7871fcfa6c0826ef68b85b0abd90fa21c285a9c5e21d2bd899",
@ -149,13 +165,16 @@
},
"httpx": {
"hashes": [
"sha256:4c81dbf98a29cb4f51f415140df56542f9d4860798d713e336642e953cddd1db",
"sha256:7b3c07bfdcdadd92020dd4c07b15932abdcf1c898422a4e98de3d19b2223310b"
"sha256:02326f2d3c61133db31e4b88dd3432479b434e52a68d813eab6db930f13611ea",
"sha256:254b371e3880a8e2387bf9ead6949bac797bd557fda26eba19a6153a0c06bd2b"
],
"index": "pypi",
"version": "==0.15.4"
"version": "==0.15.5"
},
"hypercorn": {
"extras": [
"trio"
],
"hashes": [
"sha256:6540faeba9dd44f7e74c7cc1beae3a438a7efb5f77323d1199457da46d32c2c2",
"sha256:b5c479023757e279f954b46a4ec9dd85e58a2bcbf4d959d5601cbced593e711d"
@ -242,6 +261,14 @@
],
"version": "==3.8.0"
},
"outcome": {
"hashes": [
"sha256:ee46c5ce42780cde85d55a61819d0e6b8cb490f1dbd749ba75ff2629771dcd2d",
"sha256:fc7822068ba7dd0fc2532743611e8a73246708d3564e29a39f93d6ab3701b66f"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.0.1"
},
"priority": {
"hashes": [
"sha256:6bc1961a6d7fcacbfc337769f1a382c8e746566aaa365e78047abe9f66b2ffbe",
@ -309,6 +336,14 @@
"index": "pypi",
"version": "==0.3.0"
},
"quart-trio": {
"hashes": [
"sha256:00f3b20f8d82ce7e81ead61db4efba38ed7653c7e28199defded46b663ab2595",
"sha256:dafc8f0440d4b70fa60d24122a161d2373894d2bfa9f713d9f1df1fd508f0834"
],
"index": "pypi",
"version": "==0.5.1"
},
"requests": {
"hashes": [
"sha256:b3559a131db72c33ee969480840fff4bb6dd111de7dd27c8ee1f820f4f00231b",
@ -354,6 +389,13 @@
],
"version": "==1.1.0"
},
"sortedcontainers": {
"hashes": [
"sha256:4e73a757831fc3ca4de2859c422564239a31d8213d09a2a666e375807034d2ba",
"sha256:c633ebde8580f241f274c1f8994a665c0e54a17724fecd0cae2f079e09c36d3f"
],
"version": "==2.2.2"
},
"toml": {
"hashes": [
"sha256:926b612be1e5ce0634a2ca03470f95169cf16f939018233a670519cb4ac58b0f",
@ -361,6 +403,14 @@
],
"version": "==0.10.1"
},
"trio": {
"hashes": [
"sha256:e85cf9858e445465dfbb0e3fdf36efe92082d2df87bfe9d62585eedd6e8e9d7d",
"sha256:fc70c74e8736d1105b3c05cc2e49b30c58755733740f9c51ae6d88a4d6d0a291"
],
"index": "pypi",
"version": "==0.17.0"
},
"typing-extensions": {
"hashes": [
"sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918",
@ -400,6 +450,14 @@
],
"version": "==1.4.4"
},
"async-generator": {
"hashes": [
"sha256:01c7bf666359b4967d2cda0000cc2e4af16a0ae098cbffcb8472fb9e8ad6585b",
"sha256:6ebb3d106c12920aaae42ccb6f787ef5eefdcdd166ea3d628fa8476abe712144"
],
"markers": "python_version >= '3.5'",
"version": "==1.10"
},
"attrs": {
"hashes": [
"sha256:26b54ddbbb9ee1d34d5d3668dd37d6cf74990ab23c828c2888dccdceee395594",
@ -460,6 +518,14 @@
],
"version": "==5.3"
},
"idna": {
"hashes": [
"sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6",
"sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==2.10"
},
"iniconfig": {
"hashes": [
"sha256:80cf40c597eb564e86346103f609d74efce0f6b4d4f30ec8ce9e2c26411ba437",
@ -494,6 +560,14 @@
],
"version": "==0.4.3"
},
"outcome": {
"hashes": [
"sha256:ee46c5ce42780cde85d55a61819d0e6b8cb490f1dbd749ba75ff2629771dcd2d",
"sha256:fc7822068ba7dd0fc2532743611e8a73246708d3564e29a39f93d6ab3701b66f"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.0.1"
},
"packaging": {
"hashes": [
"sha256:4357f74f47b9c12db93624a82154e9b120fa8293699949152b22065d556079f8",
@ -531,19 +605,11 @@
},
"pytest": {
"hashes": [
"sha256:1cd09785c0a50f9af72220dd12aa78cfa49cbffc356c61eab009ca189e018a33",
"sha256:d010e24666435b39a4cf48740b039885642b6c273a3f77be3e7e03554d2806b7"
"sha256:7a8190790c17d79a11f847fba0b004ee9a8122582ebff4729a082c109e81a4c9",
"sha256:8f593023c1a0f916110285b6efd7f99db07d59546e3d8c36fc60e2ab05d3be92"
],
"index": "pypi",
"version": "==6.1.0"
},
"pytest-asyncio": {
"hashes": [
"sha256:2eae1e34f6c68fc0a9dc12d4bea190483843ff4708d24277c41568d6b6044f1d",
"sha256:9882c0c6b24429449f5f969a5158b528f39bde47dc32e85b9f0403965017e700"
],
"index": "pypi",
"version": "==0.14.0"
"version": "==6.1.1"
},
"pytest-cov": {
"hashes": [
@ -553,6 +619,14 @@
"index": "pypi",
"version": "==2.10.1"
},
"pytest-trio": {
"hashes": [
"sha256:3f48cc1df66d279d705af38ad38d1639c2e2380ddffcdc3a45bb81758de61f03",
"sha256:9bf0a490fd177a33617e8709242293fae47934de2b51f8209eb2c0545b6ca8fe"
],
"index": "pypi",
"version": "==0.6.0"
},
"regex": {
"hashes": [
"sha256:088afc8c63e7bd187a3c70a94b9e50ab3f17e1d3f52a32750b5b77dbe99ef5ef",
@ -586,6 +660,21 @@
],
"version": "==1.15.0"
},
"sniffio": {
"hashes": [
"sha256:20ed6d5b46f8ae136d00b9dcb807615d83ed82ceea6b2058cecb696765246da5",
"sha256:8e3810100f69fe0edd463d02ad407112542a11ffdc29f67db2bf3771afb87a21"
],
"markers": "python_version >= '3.5'",
"version": "==1.1.0"
},
"sortedcontainers": {
"hashes": [
"sha256:4e73a757831fc3ca4de2859c422564239a31d8213d09a2a666e375807034d2ba",
"sha256:c633ebde8580f241f274c1f8994a665c0e54a17724fecd0cae2f079e09c36d3f"
],
"version": "==2.2.2"
},
"toml": {
"hashes": [
"sha256:926b612be1e5ce0634a2ca03470f95169cf16f939018233a670519cb4ac58b0f",
@ -593,6 +682,14 @@
],
"version": "==0.10.1"
},
"trio": {
"hashes": [
"sha256:e85cf9858e445465dfbb0e3fdf36efe92082d2df87bfe9d62585eedd6e8e9d7d",
"sha256:fc70c74e8736d1105b3c05cc2e49b30c58755733740f9c51ae6d88a4d6d0a291"
],
"index": "pypi",
"version": "==0.17.0"
},
"typed-ast": {
"hashes": [
"sha256:0666aa36131496aed8f7be0410ff974562ab7eeac11ef351def9ea6fa28f6355",

View File

@ -1,7 +1,8 @@
import asyncio
import trio # type: ignore
import importlib
from quart import Quart, g
from quart import g
from quart_trio import QuartTrio
from quart_cors import cors # type: ignore
from quart_compress import Compress # type: ignore
from secure import SecureHeaders # type: ignore
@ -15,11 +16,11 @@ from .proxy_fix import ASGIProxyFix
secure_headers = SecureHeaders(hsts=False)
def create_app(config_object="lnbits.settings") -> Quart:
def create_app(config_object="lnbits.settings") -> QuartTrio:
"""Create application factory.
:param config_object: The configuration object to use.
"""
app = Quart(__name__, static_folder="static")
app = QuartTrio(__name__, static_folder="static")
app.config.from_object(config_object)
app.asgi_http_class = ASGIProxyFix
@ -36,7 +37,7 @@ def create_app(config_object="lnbits.settings") -> Quart:
return app
def register_blueprints(app: Quart) -> None:
def register_blueprints(app: QuartTrio) -> None:
"""Register Flask blueprints / LNbits extensions."""
app.register_blueprint(core_app)
@ -58,13 +59,13 @@ def register_blueprints(app: Quart) -> None:
raise ImportError(f"Please make sure that the extension `{ext.code}` follows conventions.")
def register_commands(app: Quart):
def register_commands(app: QuartTrio):
"""Register Click commands."""
app.cli.add_command(db_migrate)
app.cli.add_command(handle_assets)
def register_assets(app: Quart):
def register_assets(app: QuartTrio):
"""Serve each vendored asset separately or a bundle."""
@app.before_request
@ -77,13 +78,13 @@ def register_assets(app: Quart):
g.VENDORED_CSS = ["/static/bundle.css"]
def register_filters(app: Quart):
def register_filters(app: QuartTrio):
"""Jinja filters."""
app.jinja_env.globals["SITE_TITLE"] = app.config["LNBITS_SITE_TITLE"]
app.jinja_env.globals["EXTENSIONS"] = get_valid_extensions()
def register_request_hooks(app: Quart):
def register_request_hooks(app: QuartTrio):
"""Open the core db for each request so everything happens in a big transaction"""
@app.before_request
@ -109,8 +110,8 @@ def register_async_tasks(app):
@app.before_serving
async def listeners():
loop = asyncio.get_running_loop()
loop.create_task(invoice_listener())
app.nursery.start_soon(invoice_listener)
print("started invoice_listener")
@app.after_serving
async def stop_listeners():

View File

@ -1,7 +1,8 @@
import asyncio
import trio # type: ignore
from http import HTTPStatus
from typing import Optional, Tuple, List, Callable, Awaitable
from quart import Quart, Request, g
from quart import Request, g
from quart_trio import QuartTrio
from werkzeug.datastructures import Headers
from lnbits.db import open_db, open_ext_db
@ -10,7 +11,7 @@ from lnbits.settings import WALLET
from .models import Payment
from .crud import get_standalone_payment
main_app: Optional[Quart] = None
main_app: Optional[QuartTrio] = None
def grab_app_for_later(state):
@ -18,24 +19,30 @@ def grab_app_for_later(state):
main_app = state.app
def run_on_pseudo_request(awaitable: Awaitable):
async def run(awaitable):
fk = Request(
"GET",
"http",
"/background/pseudo",
b"",
Headers([("host", "lnbits.background")]),
"",
"1.1",
send_push_promise=lambda x, h: None,
)
async with main_app.request_context(fk):
with open_db() as g.db:
await awaitable
async def send_push_promise(a, b) -> None:
pass
loop = asyncio.get_event_loop()
loop.create_task(run(awaitable))
async def run_on_pseudo_request(func: Callable, *args):
fk = Request(
"GET",
"http",
"/background/pseudo",
b"",
Headers([("host", "lnbits.background")]),
"",
"1.1",
send_push_promise=send_push_promise,
)
assert main_app
async def run():
async with main_app.request_context(fk):
with open_db() as g.db: # type: ignore
await func(*args)
async with trio.open_nursery() as nursery:
nursery.start_soon(run)
invoice_listeners: List[Tuple[str, Callable[[Payment], Awaitable[None]]]] = []
@ -59,7 +66,7 @@ async def webhook_handler():
async def invoice_listener():
async for checking_id in WALLET.paid_invoices_stream():
run_on_pseudo_request(invoice_callback_dispatcher(checking_id))
await run_on_pseudo_request(invoice_callback_dispatcher, checking_id)
async def invoice_callback_dispatcher(checking_id: str):

View File

@ -123,6 +123,6 @@ async def lnurlwallet():
user = get_user(account.id)
wallet = create_wallet(user_id=user.id)
run_on_pseudo_request(redeem_lnurl_withdraw(wallet.id, withdraw_res, "LNbits initial funding: voucher redeem."))
run_on_pseudo_request(redeem_lnurl_withdraw, wallet.id, withdraw_res, "LNbits initial funding: voucher redeem.")
return redirect(url_for("core.wallet", usr=user.id, wal=wallet.id))

View File

@ -10,20 +10,26 @@ class Database:
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row
self.cursor = self.connection.cursor()
self.closed = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.closed:
return
if exc_val:
self.connection.rollback()
self.cursor.close()
self.cursor.close()
self.connection.close()
else:
self.connection.commit()
self.cursor.close()
self.connection.close()
self.closed = True
def commit(self):
self.connection.commit()

View File

@ -5,10 +5,10 @@ from urllib.parse import urlparse
from werkzeug.datastructures import Headers
from quart import Request
from quart.asgi import ASGIHTTPConnection
from quart_trio.asgi import TrioASGIHTTPConnection
class ASGIProxyFix(ASGIHTTPConnection):
class ASGIProxyFix(TrioASGIHTTPConnection):
def _create_request_from_scope(self, send: Callable) -> Request:
headers = Headers()
headers["Remote-Addr"] = (self.scope.get("client") or ["<local>"])[0]

View File

@ -3,7 +3,7 @@ try:
except ImportError: # pragma: nocover
LightningRpc = None
import asyncio
import trio # type: ignore
import random
import json
@ -86,7 +86,7 @@ class CLightningWallet(Wallet):
raise KeyError("supplied an invalid checking_id")
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
reader, writer = await asyncio.open_unix_connection(self.rpc)
stream = await trio.open_unix_socket(self.rpc)
i = 0
while True:
@ -98,12 +98,9 @@ class CLightningWallet(Wallet):
}
)
print(call)
writer.write(call.encode("ascii"))
await writer.drain()
await stream.send_all(call.encode("utf-8"))
data = await reader.readuntil(b"\n\n")
print(data)
data = await stream.receive_some()
paid = json.loads(data.decode("ascii"))
paid = self.ln.waitanyinvoice(self.last_pay_index)

View File

@ -1,4 +1,4 @@
import asyncio
import trio # type: ignore
from os import getenv
from typing import Optional, Dict, AsyncGenerator
from requests import get, post
@ -68,5 +68,5 @@ class LNbitsWallet(Wallet):
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
print("lnbits does not support paid invoices stream yet")
await asyncio.sleep(5)
await trio.sleep(5)
yield ""

View File

@ -1,5 +1,5 @@
import json
import asyncio
import trio # type: ignore
import httpx
from os import getenv
from http import HTTPStatus
@ -77,10 +77,9 @@ class LNPayWallet(Wallet):
return PaymentStatus(statuses[r.json()["settled"]])
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
self.queue: asyncio.Queue = asyncio.Queue()
while True:
yield await self.queue.get()
self.queue.task_done()
self.send, receive = trio.open_memory_channel(0)
async for value in receive:
yield value
async def webhook_listener(self):
text: str = await request.get_data()
@ -96,6 +95,6 @@ class LNPayWallet(Wallet):
)
data = r.json()
if data["settled"]:
self.queue.put_nowait(lntx_id)
self.send.send(lntx_id)
return "", HTTPStatus.NO_CONTENT

View File

@ -1,4 +1,4 @@
import asyncio
import trio # type: ignore
from os import getenv
from typing import Optional, Dict, AsyncGenerator
from requests import post
@ -79,5 +79,5 @@ class LntxbotWallet(Wallet):
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
print("lntxbot does not support paid invoices stream yet")
await asyncio.sleep(5)
await trio.sleep(5)
yield ""

View File

@ -1,5 +1,5 @@
import json
import asyncio
import trio # type: ignore
import hmac
import httpx
from http import HTTPStatus
@ -77,15 +77,12 @@ class OpenNodeWallet(Wallet):
return PaymentStatus(statuses[r.json()["data"]["status"]])
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
self.queue: asyncio.Queue = asyncio.Queue()
while True:
yield await self.queue.get()
self.queue.task_done()
self.send, receive = trio.open_memory_channel(0)
async for value in receive:
yield value
async def webhook_listener(self):
print("a request!")
text: str = await request.get_data()
print("text", text)
data = json.loads(text)
if type(data) is not dict or "event" not in data or data["event"].get("name") != "wallet_receive":
return "", HTTPStatus.NO_CONTENT
@ -100,5 +97,5 @@ class OpenNodeWallet(Wallet):
print("invalid webhook, not from opennode")
return "", HTTPStatus.NO_CONTENT
self.queue.put_nowait(charge_id)
self.send.send(charge_id)
return "", HTTPStatus.NO_CONTENT

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
trio_mode = true

View File

@ -4,7 +4,6 @@ from lnbits.app import create_app
@pytest.fixture
@pytest.mark.asyncio
async def client():
app = create_app()
app.config["TESTING"] = True

View File

@ -1,7 +1,6 @@
import pytest
@pytest.mark.asyncio
async def test_homepage(client):
r = await client.get("/")
assert b"Add a new wallet" in await r.get_data()