Fix gone subs not deleted from db, fix unsub removed from relay even if same pubkey is used by other psubs
This commit is contained in:
parent
0ac2052a69
commit
f41f62f735
29
src/index.js
29
src/index.js
|
@ -48,7 +48,7 @@ const npubData = new Map()
|
|||
|
||||
async function push(psub) {
|
||||
|
||||
console.log(new Date(), "push for", psub.pubkey, "pending", psub.pendingRequests);
|
||||
console.log(new Date(), "push for", psub.pubkey, "psub", psub.id, "pending", psub.pendingRequests);
|
||||
try {
|
||||
await webpush.sendNotification(psub.pushSubscription, JSON.stringify({
|
||||
cmd: 'wakeup',
|
||||
|
@ -182,25 +182,36 @@ function processReply(r, e) {
|
|||
}
|
||||
|
||||
function unsubscribeFromRelay(psub, relayUrl) {
|
||||
const relay = relays.get(relayUrl);
|
||||
relay.unsubQueue.push(psub.pubkey);
|
||||
relayQueue.push(relayUrl);
|
||||
console.log("unsubscribeunsubscribeFromRelay", psub.id, psub.pubkey, "from", relayUrl)
|
||||
|
||||
// remove from global pubkey+relay=psub table
|
||||
const pr = psub.pubkey + relayUrl;
|
||||
const psubs = sourcePsubs.get(pr).filter(pi => pi != psub.id);
|
||||
if (psubs.length > 0)
|
||||
if (psubs.length > 0) {
|
||||
// still some other psub uses the same pubkey on this relay
|
||||
sourcePsubs.set(pr, psubs);
|
||||
else
|
||||
} else {
|
||||
// this pubkey is no longer on this relay
|
||||
const relay = relays.get(relayUrl);
|
||||
relay.unsubQueue.push(psub.pubkey);
|
||||
relayQueue.push(relayUrl);
|
||||
|
||||
sourcePsubs.delete(pr);
|
||||
}
|
||||
}
|
||||
|
||||
function unsubscribe(psub) {
|
||||
console.log("unsubscribe", psub.id, psub.pubkey)
|
||||
|
||||
for (const url of psub.relays)
|
||||
unsubscribeFromRelay(psub, url);
|
||||
|
||||
pushSubs.delete(psub.id)
|
||||
|
||||
// drop from db
|
||||
prisma.pushSubs.delete({
|
||||
where: { pushId: psub.id }
|
||||
}).then(r => console.log("deleted psub", psub.id, r));
|
||||
}
|
||||
|
||||
function subscribe(psub, relayUrls) {
|
||||
|
@ -267,7 +278,7 @@ function createPubkeySub(r) {
|
|||
ensureRelay(r.url)
|
||||
|
||||
const batchSize = Math.min(r.subQueue.length, MAX_BATCH_SIZE);
|
||||
const pubkeys = r.subQueue.splice(0, batchSize);
|
||||
const pubkeys = [...new Set(r.subQueue.splice(0, batchSize))];
|
||||
|
||||
const since = Math.floor(Date.now() / 1000) - 10;
|
||||
const requestFilter = {
|
||||
|
@ -323,7 +334,8 @@ function processRelayQueue() {
|
|||
// process active relay queue
|
||||
for (const url of uniqRelays.values()) {
|
||||
const r = relays.get(url);
|
||||
console.log(new Date(), "update relay sub", r.subQueue.length, "unsub", r.unsubQueue.length);
|
||||
console.log(new Date(), "update relay", url, "sub", r.subQueue.length, "unsub", r.unsubQueue.length);
|
||||
console.log("old subs", r.subs, "unsubQueue", r.unsubQueue, "subQueue", r.subQueue)
|
||||
|
||||
// first handle the unsubs
|
||||
for (const p of new Set(r.unsubQueue).values()) {
|
||||
|
@ -360,6 +372,7 @@ function processRelayQueue() {
|
|||
// store NDK sub itself
|
||||
r.subs.set(sub.subId, sub);
|
||||
}
|
||||
console.log("new subs", r.subs)
|
||||
}
|
||||
|
||||
// close old subs after new subs have activated
|
||||
|
|
Loading…
Reference in New Issue
Block a user