Merge pull request #5 from nostrband/feature/new-pause-logic
Feature/new pause logic
This commit is contained in:
commit
a6dc55d75f
137
src/index.js
137
src/index.js
|
@ -70,12 +70,10 @@ async function push(psub) {
|
|||
"push for",
|
||||
psub.pubkey,
|
||||
"psub",
|
||||
psub.id,
|
||||
"pending",
|
||||
psub.pendingRequests
|
||||
psub.id
|
||||
);
|
||||
try {
|
||||
await webpush.sendNotification(
|
||||
const r = await webpush.sendNotification(
|
||||
psub.pushSubscription,
|
||||
JSON.stringify(
|
||||
{
|
||||
|
@ -89,6 +87,7 @@ async function push(psub) {
|
|||
}
|
||||
)
|
||||
);
|
||||
console.log("push sent for", psub.pubkey, r);
|
||||
} catch (e) {
|
||||
console.log(
|
||||
new Date(),
|
||||
|
@ -138,36 +137,36 @@ function clearTimer(psub) {
|
|||
function restartTimer(psub) {
|
||||
if (psub.timer) clearTimeout(psub.timer);
|
||||
|
||||
const passed = Date.now() - psub.lastPush;
|
||||
|
||||
// backoff
|
||||
let pause = psub.backoffMs ? psub.backoffMs : MIN_PAUSE;
|
||||
|
||||
// crop max backoff
|
||||
pause = Math.min(pause, MAX_PAUSE);
|
||||
|
||||
// exclude the time we've already been silent,
|
||||
// but leave MIN_PAUSE for a normal reply!
|
||||
pause = Math.max(pause - passed, MIN_PAUSE);
|
||||
|
||||
// arm a timer, if bunker doesn't reply withing
|
||||
// pause then we will send a push
|
||||
psub.timer = setTimeout(async () => {
|
||||
psub.timer = undefined;
|
||||
if (psub.pendingRequests > 0) {
|
||||
const ok = await push(psub);
|
||||
if (!ok) {
|
||||
// drop this psub!
|
||||
unsubscribe(psub);
|
||||
}
|
||||
|
||||
// logarithmic backoff to make sure
|
||||
// we don't spam the push server if there is
|
||||
// a dead signer
|
||||
psub.backoffMs = (psub.backoffMs || MIN_PAUSE) * 2;
|
||||
const now = Date.now()
|
||||
|
||||
// we've been pushing already and it's not waking up,
|
||||
// so we won't bother pushing again to avoid
|
||||
// annoying the push server
|
||||
if (psub.nextPush > now) {
|
||||
console.log(new Date(), "skip push for", psub.pubkey, "until", new Date(psub.nextPush));
|
||||
return
|
||||
}
|
||||
|
||||
// clear
|
||||
psub.pendingRequests = 0;
|
||||
}, pause);
|
||||
const ok = await push(psub);
|
||||
if (!ok) {
|
||||
// drop this psub!
|
||||
unsubscribe(psub);
|
||||
}
|
||||
|
||||
// multiplicative backoff
|
||||
psub.backoffMs = (psub.backoffMs || MIN_PAUSE) * 2;
|
||||
// crop
|
||||
psub.backoffMs = Math.min(psub.backoffMs, MAX_PAUSE);
|
||||
|
||||
// schedule next push
|
||||
psub.nextPush = Date.now() + psub.backoffMs;
|
||||
|
||||
}, MIN_PAUSE);
|
||||
}
|
||||
|
||||
function getP(e) {
|
||||
|
@ -185,8 +184,9 @@ function processRequest(r, e) {
|
|||
for (const id of psubs) {
|
||||
const psub = pushSubs.get(id);
|
||||
// start timer on first request
|
||||
if (!psub.pendingRequests) restartTimer(psub);
|
||||
psub.pendingRequests++;
|
||||
if (!psub.timer) {
|
||||
restartTimer(psub);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -203,26 +203,15 @@ function processReply(r, e) {
|
|||
for (const id of psubs) {
|
||||
const psub = pushSubs.get(id);
|
||||
|
||||
if (!psub.pendingRequests) {
|
||||
console.log("skip unexpected reply from", pubkey);
|
||||
continue;
|
||||
}
|
||||
psub.pendingRequests--;
|
||||
psub.backoffMs = 0; // reset backoff period
|
||||
|
||||
if (psub.pendingRequests > 0) {
|
||||
// got more pending? ok move the timer
|
||||
restartTimer(psub);
|
||||
} else {
|
||||
// no more pending? clear
|
||||
clearTimer(psub);
|
||||
}
|
||||
// it's alive, reset backoff and pending push
|
||||
psub.backoffMs = 0;
|
||||
clearTimer(psub);
|
||||
}
|
||||
}
|
||||
|
||||
function unsubscribeFromRelay(psub, relayUrl) {
|
||||
console.log(
|
||||
"unsubscribeunsubscribeFromRelay",
|
||||
"unsubscribeFromRelay",
|
||||
psub.id,
|
||||
psub.pubkey,
|
||||
"from",
|
||||
|
@ -539,7 +528,11 @@ async function verifyAuthNostr(req, npub, path, minPow = 0) {
|
|||
async function addPsub(id, pubkey, pushSubscription, relays) {
|
||||
let psub = pushSubs.get(id);
|
||||
if (psub) {
|
||||
// update endpoint
|
||||
psub.pushSubscription = pushSubscription;
|
||||
// the bunker is alive, reset backoff timer
|
||||
psub.backoffMs = 0;
|
||||
psub.nextPush = 0;
|
||||
console.log(new Date(), "sub updated", pubkey, psub, relays);
|
||||
} else {
|
||||
// new sub for this id
|
||||
|
@ -548,10 +541,10 @@ async function addPsub(id, pubkey, pushSubscription, relays) {
|
|||
pubkey,
|
||||
pushSubscription,
|
||||
relays: [],
|
||||
pendingRequests: 0,
|
||||
timer: undefined,
|
||||
backoffMs: 0,
|
||||
lastPush: 0,
|
||||
nextPush: 0
|
||||
};
|
||||
|
||||
console.log(new Date(), "sub created", pubkey, psub, relays);
|
||||
|
@ -807,11 +800,11 @@ app.post(NAME_PATH, async (req, res) => {
|
|||
const dbr = await prisma.names.update({
|
||||
where: {
|
||||
npub,
|
||||
name
|
||||
name,
|
||||
},
|
||||
data: {
|
||||
timestamp: Date.now(),
|
||||
disabled: 0
|
||||
disabled: 0,
|
||||
},
|
||||
});
|
||||
console.log("Name", name, "activated for", npub, { dbr });
|
||||
|
@ -1171,41 +1164,55 @@ class CreateAccountHandlingStrategy {
|
|||
// it ignores the unknown methods (except create_account)
|
||||
class Nip46Backend extends NDKNip46Backend {
|
||||
constructor(ndk, signer) {
|
||||
super(ndk, signer, () => Promise.resolve(true))
|
||||
signer.user().then((u) => (this.npub = nip19.npubEncode(u.pubkey)))
|
||||
super(ndk, signer, () => Promise.resolve(true));
|
||||
signer.user().then((u) => (this.npub = nip19.npubEncode(u.pubkey)));
|
||||
}
|
||||
|
||||
async handleIncomingEvent(event) {
|
||||
const { id, method, params } = await this.rpc.parseEvent(event)
|
||||
const remotePubkey = event.pubkey
|
||||
let response
|
||||
const { id, method, params } = await this.rpc.parseEvent(event);
|
||||
const remotePubkey = event.pubkey;
|
||||
let response;
|
||||
|
||||
// validate signature explicitly
|
||||
if (!verifySignature(event.rawEvent())) {
|
||||
this.debug('invalid signature', event.rawEvent())
|
||||
return
|
||||
this.debug("invalid signature", event.rawEvent());
|
||||
return;
|
||||
}
|
||||
|
||||
const strategy = this.handlers[method]
|
||||
const strategy = this.handlers[method];
|
||||
if (strategy) {
|
||||
try {
|
||||
response = await strategy.handle(this, id, remotePubkey, params)
|
||||
console.log(Date.now(), 'req', id, 'method', method, 'result', response)
|
||||
response = await strategy.handle(this, id, remotePubkey, params);
|
||||
console.log(
|
||||
Date.now(),
|
||||
"req",
|
||||
id,
|
||||
"method",
|
||||
method,
|
||||
"result",
|
||||
response
|
||||
);
|
||||
} catch (e) {
|
||||
this.debug('error handling event', e, { id, method, params })
|
||||
this.rpc.sendResponse(id, remotePubkey, 'error', undefined, e.message)
|
||||
this.debug("error handling event", e, { id, method, params });
|
||||
this.rpc.sendResponse(id, remotePubkey, "error", undefined, e.message);
|
||||
}
|
||||
} else {
|
||||
this.debug('unsupported method', { method, params })
|
||||
this.debug("unsupported method", { method, params });
|
||||
// ignore unsupported methods
|
||||
return;
|
||||
}
|
||||
|
||||
if (response) {
|
||||
this.debug(`sending response to ${remotePubkey}`, response)
|
||||
this.rpc.sendResponse(id, remotePubkey, response)
|
||||
this.debug(`sending response to ${remotePubkey}`, response);
|
||||
this.rpc.sendResponse(id, remotePubkey, response);
|
||||
} else {
|
||||
this.rpc.sendResponse(id, remotePubkey, 'error', undefined, 'Not authorized')
|
||||
this.rpc.sendResponse(
|
||||
id,
|
||||
remotePubkey,
|
||||
"error",
|
||||
undefined,
|
||||
"Not authorized"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user