From f41f62f735686ef51dda6ed84c733f7146414874 Mon Sep 17 00:00:00 2001 From: artur Date: Thu, 28 Dec 2023 14:00:49 +0300 Subject: [PATCH] Fix gone subs not deleted from db, fix unsub removed from relay even if same pubkey is used by other psubs --- src/index.js | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/index.js b/src/index.js index 0d3a4c9..f6860e0 100644 --- a/src/index.js +++ b/src/index.js @@ -48,7 +48,7 @@ const npubData = new Map() async function push(psub) { - console.log(new Date(), "push for", psub.pubkey, "pending", psub.pendingRequests); + console.log(new Date(), "push for", psub.pubkey, "psub", psub.id, "pending", psub.pendingRequests); try { await webpush.sendNotification(psub.pushSubscription, JSON.stringify({ cmd: 'wakeup', @@ -182,25 +182,36 @@ function processReply(r, e) { } function unsubscribeFromRelay(psub, relayUrl) { - const relay = relays.get(relayUrl); - relay.unsubQueue.push(psub.pubkey); - relayQueue.push(relayUrl); + console.log("unsubscribeunsubscribeFromRelay", psub.id, psub.pubkey, "from", relayUrl) // remove from global pubkey+relay=psub table const pr = psub.pubkey + relayUrl; const psubs = sourcePsubs.get(pr).filter(pi => pi != psub.id); - if (psubs.length > 0) + if (psubs.length > 0) { + // still some other psub uses the same pubkey on this relay sourcePsubs.set(pr, psubs); - else + } else { + // this pubkey is no longer on this relay + const relay = relays.get(relayUrl); + relay.unsubQueue.push(psub.pubkey); + relayQueue.push(relayUrl); + sourcePsubs.delete(pr); + } } function unsubscribe(psub) { + console.log("unsubscribe", psub.id, psub.pubkey) for (const url of psub.relays) unsubscribeFromRelay(psub, url); pushSubs.delete(psub.id) + + // drop from db + prisma.pushSubs.delete({ + where: { pushId: psub.id } + }).then(r => console.log("deleted psub", psub.id, r)); } function subscribe(psub, relayUrls) { @@ -267,7 +278,7 @@ function createPubkeySub(r) { ensureRelay(r.url) const batchSize = Math.min(r.subQueue.length, MAX_BATCH_SIZE); - const pubkeys = r.subQueue.splice(0, batchSize); + const pubkeys = [...new Set(r.subQueue.splice(0, batchSize))]; const since = Math.floor(Date.now() / 1000) - 10; const requestFilter = { @@ -323,7 +334,8 @@ function processRelayQueue() { // process active relay queue for (const url of uniqRelays.values()) { const r = relays.get(url); - console.log(new Date(), "update relay sub", r.subQueue.length, "unsub", r.unsubQueue.length); + console.log(new Date(), "update relay", url, "sub", r.subQueue.length, "unsub", r.unsubQueue.length); + console.log("old subs", r.subs, "unsubQueue", r.unsubQueue, "subQueue", r.subQueue) // first handle the unsubs for (const p of new Set(r.unsubQueue).values()) { @@ -360,6 +372,7 @@ function processRelayQueue() { // store NDK sub itself r.subs.set(sub.subId, sub); } + console.log("new subs", r.subs) } // close old subs after new subs have activated @@ -502,7 +515,7 @@ app.post(SUBSCRIBE_PATH, async (req, res) => { // write to db w/o blocking the client prisma.pushSubs.upsert({ - where: { pushId: id}, + where: { pushId: id }, create: { pushId: id, npub,