From 239a5603f44694243ec2d36d465031e068a39d79 Mon Sep 17 00:00:00 2001 From: hatim boufnichel Date: Sun, 17 Jan 2021 17:07:42 +0100 Subject: [PATCH] invoice cb --- services/gunDB/contact-api/jobs/onOrders.js | 172 ++++++++++++-------- services/schema/index.js | 50 +++++- 2 files changed, 153 insertions(+), 69 deletions(-) diff --git a/services/gunDB/contact-api/jobs/onOrders.js b/services/gunDB/contact-api/jobs/onOrders.js index b21d6a67..4ab73302 100644 --- a/services/gunDB/contact-api/jobs/onOrders.js +++ b/services/gunDB/contact-api/jobs/onOrders.js @@ -14,7 +14,6 @@ const { } = Common const SchemaManager = require('../../../schema') const LightningServices = require('../../../../utils/lightningServices') -const LNDHealthManager = require('../../../../utils/lightningServices/errors') const Key = require('../key') const Utils = require('../utils') @@ -105,7 +104,7 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => { return } - const listenerStartTime = performance.now() + //const listenerStartTime = performance.now() ordersProcessed.add(orderID) @@ -228,74 +227,113 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => { const invoicePutEndTime = performance.now() - invoicePutStartTime - // invoices should be settled right away so we can rely on this single - // subscription instead of life-long all invoices subscription - /** - * @type {string|null} - */ - let maybePostId = null - if (order.targetType === 'post') { - const { postID } = order - maybePostId = postID || null - if (!Common.isPopulatedString(postID)) { - throw new TypeError(`postID not a a populated string`) - } - } - const { r_hash } = invoice - - // A post tip order lifecycle is short enough that we can do it like this. - const stream = LightningServices.invoices.subscribeSingleInvoice({ - r_hash - }) - - /** - * @param {Common.Invoice & {r_hash:Buffer}} invoice - */ - const invoiceSubCb = invoice => { - if (!invoice.settled) { - return - } - if (order.targetType === 'post' && typeof maybePostId === 'string') { - getUser() - .get('postToTipCount') - .get(maybePostId) - .set(null) // each item in the set is a tip - } - const myLndPub = LNDHealthManager.lndPub - const myGunPub = getUser()._.sea.pub - if (!myLndPub) { - return //should never happen but just to be safe - } - SchemaManager.AddOrder({ - amount: parseInt(invoice.amt_paid, 10), - coordinateHash: invoice.r_hash.toString('hex'), - coordinateIndex: parseInt(invoice.add_index, 10), - toLndPub: myLndPub, - inbound: true, - type: 'other', //TODO better this - fromGunPub: order.from, - toGunPub: myGunPub, - invoiceMemo: invoice.memo - }) - } - - stream.on('data', invoiceSubCb) - - stream.on('status', (/** @type {any} */ status) => { - logger.info(`${r_hash}, invoice status:`, status) - }) - stream.on('end', () => { - logger.warn(`${r_hash}, invoice stream ended`) - }) - stream.on('error', (/** @type {any} */ e) => { - logger.warn(`${r_hash}, error:`, e) - }) - logger.info(`[PERF] Added invoice to GunDB in ${invoicePutEndTime}ms`) - const listenerEndTime = performance.now() - listenerStartTime + /** + * + * @type {Common.Schema.InvoiceWhenListed & {r_hash:Buffer,payment_addr:string}} + */ + const paidInvoice = await new Promise(res => { + SchemaManager.addListenInvoice(invoice.r_hash, res) + }) + const hashString = paidInvoice.r_hash.toString('hex') + const { + amt_paid_sat: amt, + add_index: addIndex, + payment_addr: paymentAddr + } = paidInvoice + /**@type {'spontaneousPayment' | 'tip' | 'service' | 'product' | 'other'}*/ + //@ts-expect-error to fix + const orderType = order.targetType + //@ts-expect-error to fix + const { ackInfo } = order //a string representing what has been requested + switch (orderType) { + case 'tip': { + if (!Common.isPopulatedString(ackInfo)) { + throw new Error('ackInfo for postID not a populated string') + } else { + getUser() + .get('postToTipCount') + .get(ackInfo) + .set(null) // each item in the set is a tip + } + break + } + case 'spontaneousPayment': { + //no action required + break + } + case 'product': { + //assuming digital product that only requires to be unlocked + const ackData = { productFinalRef: '' } //find ref by decrypting it base on "ackInfo" provided information + const toSend = JSON.stringify(ackData) + const encrypted = await SEA.encrypt(toSend, secret) + const ordResponse = { + type: 'orderAck', + content: encrypted + } + await new Promise((res, rej) => { + getUser() + .get(Key.ORDER_TO_RESPONSE) + .get(orderID) + .put(ordResponse, ack => { + if (ack.err && typeof ack.err !== 'number') { + rej( + new Error( + `Error saving encrypted orderAck to order to response usergraph: ${ack}` + ) + ) + } else { + res() + } + }) + }) + break + } + case 'service': { + const ackData = { serviceFinalRef: '' } //find ref by decrypting it base on "ackInfo" provided information + const toSend = JSON.stringify(ackData) + const encrypted = await SEA.encrypt(toSend, secret) + const serviceResponse = { + type: 'orderAck', + content: encrypted + } + await new Promise((res, rej) => { + getUser() + .get(Key.ORDER_TO_RESPONSE) + .get(orderID) + .put(serviceResponse, ack => { + if (ack.err && typeof ack.err !== 'number') { + rej( + new Error( + `Error saving encrypted orderAck to order to response usergraph: ${ack}` + ) + ) + } else { + res() + } + }) + }) + break + } + case 'other': //not implemented yet but save them as a coordinate anyways + break + default: + return //exit because not implemented + } + const myGunPub = getUser()._.sea.pub + SchemaManager.AddOrder({ + type: orderType, + coordinateHash: hashString, + coordinateIndex: parseInt(addIndex, 10), + inbound: true, + amount: parseInt(amt, 10), - logger.info(`[PERF] Invoice generation completed in ${listenerEndTime}ms`) + toLndPub: paymentAddr, + fromGunPub: order.from, + toGunPub: myGunPub, + invoiceMemo: memo + }) } catch (err) { logger.error( `error inside onOrders, orderAddr: ${addr}, orderID: ${orderID}, order: ${JSON.stringify( diff --git a/services/schema/index.js b/services/schema/index.js index d786ef7f..64ea82f8 100644 --- a/services/schema/index.js +++ b/services/schema/index.js @@ -1,5 +1,5 @@ const Crypto = require('crypto') -const { Utils: CommonUtils } = require('shock-common') +const Common = require('shock-common') const getGunUser = () => require('../gunDB/Mediator').getUser() const Key = require('../gunDB/contact-api/key') /** @@ -259,7 +259,7 @@ class SchemaManager { if (!coordinates) { return orders } - await CommonUtils.asyncForEach(coordinates, async coordinateSHA256 => { + await Common.Utils.asyncForEach(coordinates, async coordinateSHA256 => { const encryptedOrderString = await getGunUser() .get(Key.COORDINATES) .get(coordinateSHA256) @@ -282,6 +282,52 @@ class SchemaManager { return orderedOrders } + /** + * @typedef {Common.Schema.InvoiceWhenListed & {r_hash:Buffer,payment_addr:string}} Invoice + */ + /** + * @type {Recordvoid>} + */ + _listeningInvoices = {} + + /** + * + * @param {Buffer} r_hash + * @param {(invoice:Invoice) =>void} done + */ + addListenInvoice(r_hash, done) { + const hashString = r_hash.toString("hex") + this._listeningInvoices[hashString] = done + } + + /** + * + * @param {Common.Schema.InvoiceWhenListed & {r_hash:Buffer,payment_addr:string}} data + */ + invoiceStreamDataCb(data) { + if (!data.settled) { + //invoice not paid yet + return + } + const hashString = data.r_hash.toString('hex') + const amt = parseInt(data.amt_paid_sat, 10) + if (this._listeningInvoices[hashString]) { + const done = this._listeningInvoices[hashString] + delete this._listeningInvoices[hashString] + done(data) + } else { + this.AddOrder({ + type: 'invoice', + coordinateHash: hashString, + coordinateIndex: parseInt(data.add_index, 10), + inbound: true, + amount: amt, + toLndPub: data.payment_addr, + invoiceMemo: data.memo + }) + } + } + /** * * @param {string} address