diff --git a/.env.example b/.env.example index 1e0d824d..83e2ce68 100644 --- a/.env.example +++ b/.env.example @@ -4,4 +4,6 @@ MS_TO_TOKEN_EXPIRATION=4500000 DISABLE_SHOCK_ENCRYPTION=false CACHE_HEADERS_MANDATORY=true SHOCK_CACHE=true -TRUSTED_KEYS=true \ No newline at end of file +TRUSTED_KEYS=true +TORRENT_SEED_URL=https://webtorrent.shock.network +TORRENT_SEED_TOKEN=jibberish \ No newline at end of file diff --git a/package.json b/package.json index f9d81a67..75f523a5 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "localtunnel": "^1.9.0", "lodash": "^4.17.20", "method-override": "^2.3.7", + "node-fetch": "^2.6.1", "node-persist": "^3.1.0", "promise": "^8.1.0", "ramda": "^0.27.1", @@ -50,7 +51,7 @@ "request-promise": "^4.2.6", "response-time": "^2.3.2", "shelljs": "^0.8.2", - "shock-common": "29.1.0", + "shock-common": "31.1.0", "socket.io": "2.1.1", "text-encoding": "^0.7.0", "tingodb": "^0.6.1", @@ -98,4 +99,4 @@ "pre-commit": "yarn lint && yarn typecheck && yarn lint-staged" } } -} +} \ No newline at end of file diff --git a/services/gunDB/contact-api/actions.js b/services/gunDB/contact-api/actions.js index ad3acd7c..7d81b5ac 100644 --- a/services/gunDB/contact-api/actions.js +++ b/services/gunDB/contact-api/actions.js @@ -21,6 +21,8 @@ const { const Getters = require('./getters') const Key = require('./key') const Utils = require('./utils') +const SchemaManager = require('../../schema') +const LNDHealthMananger = require('../../../utils/lightningServices/errors') /** * @typedef {import('./SimpleGUN').GUNNode} GUNNode @@ -923,7 +925,11 @@ const sendHRWithInitialMsg = async ( * @prop {Common.Schema.OrderTargetType} type * @prop {string=} postID */ - +/** + * @typedef {object} OrderRes + * @prop {PaymentV2} payment + * @prop {object=} orderAck + */ /** * Returns the preimage corresponding to the payment. * @param {string} to @@ -933,14 +939,14 @@ const sendHRWithInitialMsg = async ( * @param {SpontPaymentOptions} opts * @throws {Error} If no response in less than 20 seconds from the recipient, or * lightning cannot find a route for the payment. - * @returns {Promise} The payment's preimage. + * @returns {Promise} The payment's preimage. */ const sendSpontaneousPayment = async ( to, amount, memo, feeLimit, - opts = { type: 'user' } + opts = { type: 'spontaneousPayment' } ) => { try { const SEA = require('../Mediator').mySEA @@ -965,8 +971,8 @@ const sendSpontaneousPayment = async ( targetType: opts.type } - if (opts.type === 'post') { - order.postID = opts.postID + if (opts.type === 'tip') { + order.ackInfo = opts.postID } logger.info(JSON.stringify(order)) @@ -1073,17 +1079,74 @@ const sendSpontaneousPayment = async ( feeLimit, payment_request: orderResponse.response }) - - const coordinate = 'lnPub + invoiceIndex + payment hash(?)' //.... - const orderData = { - someInfo: 'info ' + const myLndPub = LNDHealthMananger.lndPub + if (opts.type !== 'contentReveal' && opts.type !== 'torrentSeed') { + SchemaManager.AddOrder({ + type: opts.type, + amount: parseInt(payment.value_sat, 10), + coordinateHash: payment.payment_hash, + coordinateIndex: parseInt(payment.payment_index, 10), + fromLndPub: myLndPub || undefined, + inbound: false, + fromGunPub: getUser()._.sea.pub, + toGunPub: to, + invoiceMemo: memo + }) + return { payment } } - getUser() - .get('orders') - .get(coordinate) - .set(orderData) + /** @type {import('shock-common').Schema.OrderResponse} */ + const encryptedOrderAckRes = await Utils.tryAndWait( + gun => + new Promise(res => { + gun + .user(to) + .get(Key.ORDER_TO_RESPONSE) + .get(orderID) + .on(orderResponse => { + if (Schema.isOrderResponse(orderResponse)) { + res(orderResponse) + } + }) + }), + v => !Schema.isOrderResponse(v) + ) - return payment + if (!Schema.isOrderResponse(encryptedOrderAckRes)) { + const e = TypeError( + `Expected OrderResponse got: ${typeof encryptedOrderAckRes}` + ) + logger.error(e) + throw e + } + + /** @type {import('shock-common').Schema.OrderResponse} */ + const orderAck = { + response: await SEA.decrypt(encryptedOrderAckRes.response, ourSecret), + type: encryptedOrderAckRes.type + } + + logger.info('decoded encryptedOrderAck: ' + JSON.stringify(orderAck)) + + if (orderAck.type === 'err') { + throw new Error(orderAck.response) + } + + if (orderAck.type !== 'orderAck') { + throw new Error(`expected orderAck response, got: ${orderAck.type}`) + } + SchemaManager.AddOrder({ + type: opts.type, + amount: parseInt(payment.value_sat, 10), + coordinateHash: payment.payment_hash, + coordinateIndex: parseInt(payment.payment_index, 10), + fromLndPub: myLndPub || undefined, + inbound: false, + fromGunPub: getUser()._.sea.pub, + toGunPub: to, + invoiceMemo: memo, + metadata: JSON.stringify(orderAck) + }) + return { payment, orderAck } } catch (e) { logger.error('Error inside sendPayment()') logger.error(e) @@ -1102,8 +1165,8 @@ const sendSpontaneousPayment = async ( * @returns {Promise} The payment's preimage. */ const sendPayment = async (to, amount, memo, feeLimit) => { - const payment = await sendSpontaneousPayment(to, amount, memo, feeLimit) - return payment.payment_preimage + const res = await sendSpontaneousPayment(to, amount, memo, feeLimit) + return res.payment.payment_preimage } /** @@ -1274,9 +1337,10 @@ const setLastSeenApp = () => * @param {string[]} tags * @param {string} title * @param {Common.Schema.ContentItem[]} content + * @param {ISEA} SEA * @returns {Promise<[string, Common.Schema.RawPost]>} */ -const createPostNew = async (tags, title, content) => { +const createPostNew = async (tags, title, content, SEA) => { /** @type {Common.Schema.RawPost} */ const newPost = { date: Date.now(), @@ -1285,11 +1349,19 @@ const createPostNew = async (tags, title, content) => { title, contentItems: {} } - - content.forEach(c => { + const mySecret = require('../Mediator').getMySecret() + await Common.Utils.asyncForEach(content, async c => { + const cBis = c + if ( + (cBis.type === 'image/embedded' || cBis.type === 'video/embedded') && + cBis.isPrivate + ) { + const encryptedMagnet = await SEA.encrypt(cBis.magnetURI, mySecret) + cBis.magnetURI = encryptedMagnet + } // @ts-expect-error const uuid = Gun.text.random() - newPost.contentItems[uuid] = c + newPost.contentItems[uuid] = cBis }) /** @type {string} */ @@ -1317,9 +1389,10 @@ const createPostNew = async (tags, title, content) => { * @param {string[]} tags * @param {string} title * @param {Common.Schema.ContentItem[]} content + * @param {ISEA} SEA * @returns {Promise} */ -const createPost = async (tags, title, content) => { +const createPost = async (tags, title, content, SEA) => { if (content.length === 0) { throw new Error(`A post must contain at least one paragraph/image/video`) } @@ -1396,7 +1469,7 @@ const createPost = async (tags, title, content) => { ) }) - const [postID, newPost] = await createPostNew(tags, title, content) + const [postID, newPost] = await createPostNew(tags, title, content, SEA) await Common.makePromise((res, rej) => { require('../Mediator') diff --git a/services/gunDB/contact-api/jobs/onOrders.js b/services/gunDB/contact-api/jobs/onOrders.js index 4ab73302..fc3485a7 100644 --- a/services/gunDB/contact-api/jobs/onOrders.js +++ b/services/gunDB/contact-api/jobs/onOrders.js @@ -8,6 +8,9 @@ const isFinite = require('lodash/isFinite') const isNumber = require('lodash/isNumber') const isNaN = require('lodash/isNaN') const Common = require('shock-common') +const crypto = require('crypto') +// @ts-expect-error TODO fix this +const fetch = require('node-fetch') const { Constants: { ErrorCode }, Schema @@ -242,30 +245,61 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => { add_index: addIndex, payment_addr: paymentAddr } = paidInvoice - /**@type {'spontaneousPayment' | 'tip' | 'service' | 'product' | 'other'}*/ - //@ts-expect-error to fix const orderType = order.targetType - //@ts-expect-error to fix const { ackInfo } = order //a string representing what has been requested switch (orderType) { case 'tip': { - if (!Common.isPopulatedString(ackInfo)) { - throw new Error('ackInfo for postID not a populated string') - } else { - getUser() - .get('postToTipCount') - .get(ackInfo) - .set(null) // each item in the set is a tip + const postID = ackInfo + if (!Common.isPopulatedString(postID)) { + break //create the coordinate, but stop because of the invalid id } + getUser() + .get('postToTipCount') + .get(postID) + .set(null) // each item in the set is a tip break } case 'spontaneousPayment': { //no action required break } - case 'product': { + case 'contentReveal': { //assuming digital product that only requires to be unlocked - const ackData = { productFinalRef: '' } //find ref by decrypting it base on "ackInfo" provided information + const postID = ackInfo + if (!Common.isPopulatedString(postID)) { + break //create the coordinate, but stop because of the invalid id + } + const selectedPost = await new Promise(res => { + getUser() + .get(Key.POSTS_NEW) + .get(postID) + .load(res) + }) + if (!Common.Schema.isPost(selectedPost)) { + break //create the coordinate, but stop because of the invalid post + } + /** + * @type {Record} + */ + const contentsToSend = {} + const mySecret = require('../../Mediator').getMySecret() + await Common.Utils.asyncForEach( + Object.entries(selectedPost.contentItems), + async ([contentID, item]) => { + if ( + item.type !== 'image/embedded' && + item.type !== 'video/embedded' + ) { + return //only visual content can be private + } + if (!item.isPrivate) { + return + } + const decrypted = await SEA.decrypt(item.magnetURI, mySecret) + contentsToSend[contentID] = decrypted + } + ) + const ackData = { unlockedContents: contentsToSend } const toSend = JSON.stringify(ackData) const encrypted = await SEA.encrypt(toSend, secret) const ordResponse = { @@ -290,8 +324,29 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => { }) break } - case 'service': { - const ackData = { serviceFinalRef: '' } //find ref by decrypting it base on "ackInfo" provided information + case 'torrentSeed': { + const seedUrl = process.env.TORRENT_SEED_URL + const seedToken = process.env.TORRENT_SEED_TOKEN + if (!seedUrl || !seedToken) { + break //service not available + } + const token = crypto.randomBytes(32).toString('hex') + const reqData = { + seed_token: seedToken, + wallet_token: token + } + const res = await fetch(`${seedUrl}/api/enroll_token`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(reqData) + }) + if (res.ok) { + break //request didnt work, save coordinate anyway + } + + const ackData = { seedUrl, token } const toSend = JSON.stringify(ackData) const encrypted = await SEA.encrypt(toSend, secret) const serviceResponse = { diff --git a/services/schema/index.js b/services/schema/index.js index 64ea82f8..786b5ab0 100644 --- a/services/schema/index.js +++ b/services/schema/index.js @@ -1,10 +1,13 @@ const Crypto = require('crypto') +const logger = require('winston') const Common = require('shock-common') const getGunUser = () => require('../gunDB/Mediator').getUser() +const isAuthenticated = () => require('../gunDB/Mediator').isAuthenticated() const Key = require('../gunDB/contact-api/key') +const lndV2 = require('../../utils/lightningServices/v2') /** * @typedef {import('../gunDB/contact-api/SimpleGUN').ISEA} ISEA - * @typedef { 'spontaneousPayment' | 'tip' | 'service' | 'product' | 'other'|'invoice'|'payment'|'chainTx' } OrderType + * @typedef { 'spontaneousPayment' | 'tip' | 'torrentSeed' | 'contentReveal' | 'other'|'invoice'|'payment'|'chainTx' } OrderType * * This represents a settled order only, unsettled orders have no coordinate * @typedef {object} CoordinateOrder //everything is optional for different types @@ -150,6 +153,172 @@ const getMonthCoordinates = async (year = null, month = null) => { return coordinatesArray } +/** + * + * @param {string|undefined} address + * @param {CoordinateOrder} orderInfo + */ +const AddTmpChainOrder = async (address, orderInfo) => { + if (!address) { + throw new Error("invalid address passed to AddTmpChainOrder") + } + if (!orderInfo.toBtcPub) { + throw new Error("invalid toBtcPub passed to AddTmpChainOrder") + } + const checkErr = checkOrderInfo(orderInfo) + if (checkErr) { + throw new Error(checkErr) + } + + /** + * @type {CoordinateOrder} + */ + const filteredOrder = { + fromLndPub: orderInfo.fromLndPub, + toLndPub: orderInfo.toLndPub, + fromGunPub: orderInfo.fromGunPub, + toGunPub: orderInfo.toGunPub, + inbound: orderInfo.inbound, + ownerGunPub: orderInfo.ownerGunPub, + coordinateIndex: orderInfo.coordinateIndex, + coordinateHash: orderInfo.coordinateHash, + type: orderInfo.type, + amount: orderInfo.amount, + description: orderInfo.description, + metadata: orderInfo.metadata, + + timestamp: orderInfo.timestamp || Date.now(), + } + const orderString = JSON.stringify(filteredOrder) + const mySecret = require('../gunDB/Mediator').getMySecret() + const SEA = require('../gunDB/Mediator').mySEA + const encryptedOrderString = await SEA.encrypt(orderString, mySecret) + + const addressSHA256 = Crypto.createHash('SHA256') + .update(address) + .digest('hex') + + await new Promise((res, rej) => { + getGunUser() + .get(Key.TMP_CHAIN_COORDINATE) + .get(addressSHA256) + .put(encryptedOrderString, ack => { + if (ack.err && typeof ack.err !== 'number') { + rej( + new Error( + `Error saving tmp chain coordinate order to user-graph: ${ack}` + ) + ) + } else { + res(null) + } + }) + }) +} + +/** + * + * @param {string} address + * @returns {Promise} + */ +const isTmpChainOrder = async (address) => { + if (typeof address !== 'string' || address === '') { + return false + } + const addressSHA256 = Crypto.createHash('SHA256') + .update(address) + .digest('hex') + + const maybeData = await getGunUser() + .get(Key.TMP_CHAIN_COORDINATE) + .get(addressSHA256) + .then() + + if (typeof maybeData !== 'string' || maybeData === '') { + return false + } + const mySecret = require('../gunDB/Mediator').getMySecret() + const SEA = require('../gunDB/Mediator').mySEA + const decryptedString = await SEA.decrypt(maybeData, mySecret) + if (typeof decryptedString !== 'string' || decryptedString === '') { + return false + } + + const tmpOrder = JSON.parse(decryptedString) + const checkErr = checkOrderInfo(tmpOrder) + if (checkErr) { + return false + } + + return tmpOrder + +} + +/** + * @param {string} address + */ +const clearTmpChainOrder = async (address) => { + if (typeof address !== 'string' || address === '') { + return + } + const addressSHA256 = Crypto.createHash('SHA256') + .update(address) + .digest('hex') + + await new Promise((res, rej) => { + getGunUser() + .get(Key.TMP_CHAIN_COORDINATE) + .get(addressSHA256) + .put(null, ack => { + if (ack.err && typeof ack.err !== 'number') { + rej( + new Error( + `Error nulling tmp chain coordinate order to user-graph: ${ack}` + ) + ) + } else { + res(null) + } + }) + }) +} + +/** + * @param {Common.Schema.ChainTransaction} tx + * @param {CoordinateOrder|false| undefined} order + */ +const handleUnconfirmedTx = (tx, order) => { + const { tx_hash } = tx + const amountInt = parseInt(tx.amount, 10) + if (order) { + /*if an order already exists, update the order data + if an unconfirmed transaction has a tmp order already + it means the address was generated by shockAPI, or the tx was sent by shockAPI*/ + const orderUpdate = order + orderUpdate.amount = Math.abs(amountInt) + orderUpdate.inbound = amountInt > 0 + /*tmp coordinate does not have a coordinate hash until the transaction is created, + before it will contain 'unknown' */ + orderUpdate.coordinateHash = tx_hash + /*update the order data, + provides a notification when the TX enters the mempool */ + AddTmpChainOrder(orderUpdate.toBtcPub, orderUpdate) + } else { + /*if an order does not exist, create the tmp order, + and use tx_hash as key. + this means the address was NOT generated by shockAPI, or the tx was NOT sent by shockAPI */ + AddTmpChainOrder(tx_hash, { + type: 'chainTx', + amount: Math.abs(amountInt), + coordinateHash: tx_hash, + coordinateIndex: 0, //coordinate index is 0 until the tx is confirmed and the block is known + inbound: amountInt > 0, + toBtcPub: 'unknown' + }) + } + +} + class SchemaManager { constructor(opts = { memIndex: false }) {//config flag? this.memIndex = opts.memIndex @@ -328,104 +497,128 @@ class SchemaManager { } } + + + + + + /** - * - * @param {string} address - * @param {CoordinateOrder} orderInfo + * @type {Record} + * lnd fires a confirmed transaction event TWICE, let's make sure it is only managed ONCE */ - //eslint-disable-next-line class-methods-use-this - async AddTmpChainOrder(address, orderInfo) { - const checkErr = checkOrderInfo(orderInfo) - if (checkErr) { - throw new Error(checkErr) + _confirmedTransactions = {} + + /** + * @param {Common.Schema.ChainTransaction} data + */ + async transactionStreamDataCb(data) { + const { num_confirmations } = data + const responses = await Promise.all(data.dest_addresses.map(isTmpChainOrder)) + const hasOrder = responses.find(res => res !== false) + + if (num_confirmations === 0) { + handleUnconfirmedTx(data, hasOrder) + } else { + this.handleConfirmedTx(data, hasOrder) } - - /** - * @type {CoordinateOrder} - */ - const filteredOrder = { - fromLndPub: orderInfo.fromLndPub, - toLndPub: orderInfo.toLndPub, - fromGunPub: orderInfo.fromGunPub, - toGunPub: orderInfo.toGunPub, - inbound: orderInfo.inbound, - ownerGunPub: orderInfo.ownerGunPub, - coordinateIndex: orderInfo.coordinateIndex, - coordinateHash: orderInfo.coordinateHash, - type: orderInfo.type, - amount: orderInfo.amount, - description: orderInfo.description, - metadata: orderInfo.metadata, - - timestamp: orderInfo.timestamp || Date.now(), - } - const orderString = JSON.stringify(filteredOrder) - const mySecret = require('../gunDB/Mediator').getMySecret() - const SEA = require('../gunDB/Mediator').mySEA - const encryptedOrderString = await SEA.encrypt(orderString, mySecret) - - const addressSHA256 = Crypto.createHash('SHA256') - .update(address) - .digest('hex') - - await new Promise((res, rej) => { - getGunUser() - .get(Key.TMP_CHAIN_COORDINATE) - .get(addressSHA256) - .put(encryptedOrderString, ack => { - if (ack.err && typeof ack.err !== 'number') { - rej( - new Error( - `Error saving tmp chain coordinate order to user-graph: ${ack}` - ) - ) - } else { - res(null) - } - }) - }) } + + /** * - * @param {string} address - * @returns {Promise} + * @param {Common.Schema.ChainTransaction} tx + * @param {CoordinateOrder|false| undefined} order */ - //eslint-disable-next-line class-methods-use-this - async isTmpChainOrder(address) { - if (typeof address !== 'string' || address === '') { - return false + handleConfirmedTx(tx, order) { + const { tx_hash } = tx + if (this._confirmedTransactions[tx_hash]) { + //this tx confirmation was already handled + return } - const addressSHA256 = Crypto.createHash('SHA256') - .update(address) - .digest('hex') - - const maybeData = await getGunUser() - .get(Key.TMP_CHAIN_COORDINATE) - .get(addressSHA256) - .then() - - if (typeof maybeData !== 'string' || maybeData === '') { - return false + if (!order) { + /*confirmed transaction MUST have a tmp order, + if not, means something gone wrong */ + logger.error('found a confirmed transaction that does not have a tmp order!!') + return } - const mySecret = require('../gunDB/Mediator').getMySecret() - const SEA = require('../gunDB/Mediator').mySEA - const decryptedString = await SEA.decrypt(maybeData, mySecret) - if (typeof decryptedString !== 'string' || decryptedString === '') { - return false + if (!order.toBtcPub) { + /*confirmed transaction tmp order MUST have a non null toBtcPub */ + logger.error('found a confirmed transaction that does not have toBtcPub in the order!!') + return } - - const tmpOrder = JSON.parse(decryptedString) - const checkErr = checkOrderInfo(tmpOrder) - if (checkErr) { - return false + const finalOrder = order + finalOrder.coordinateIndex = tx.block_height + this.AddOrder(finalOrder) + if (order.toBtcPub === 'unknown') { + clearTmpChainOrder(tx_hash) + } else { + clearTmpChainOrder(order.toBtcPub) } - - return tmpOrder - + this._confirmedTransactions[tx_hash] = true } } const Manager = new SchemaManager() +/*invoice stream, +this is the only place where it's needed, +everything is a coordinate now*/ +let InvoiceShouldRetry = true +setInterval(() => { + if (!InvoiceShouldRetry) { + return + } + if (!isAuthenticated()) { + return + } + InvoiceShouldRetry = false + + lndV2.subscribeInvoices( + invoice => { + if (!isAuthenticated) { + logger.error("got an invoice while not authenticated, will ignore it and cancel the stream") + return true + } + Manager.invoiceStreamDataCb(invoice) + return false + }, + error => { + logger.error(`Error in invoices sub, will retry in two second, reason: ${error.reason}`) + InvoiceShouldRetry = true + } + ) + +}, 2000) + +/*transactions stream, +this is the only place where it's needed, +everything is a coordinate now*/ +let TransactionShouldRetry = true +setInterval(() => { + if (!TransactionShouldRetry) { + return + } + if (!isAuthenticated()) { + return + } + TransactionShouldRetry = false + + lndV2.subscribeTransactions( + transaction => { + if (!isAuthenticated) { + logger.error("got a transaction while not authenticated, will ignore it and cancel the stream") + return true + } + Manager.transactionStreamDataCb(transaction) + return false + }, + error => { + logger.error(`Error in transaction sub, will retry in two second, reason: ${error.reason}`) + TransactionShouldRetry = true + } + ) + +}, 2000) module.exports = Manager \ No newline at end of file diff --git a/src/sockets.js b/src/sockets.js index 236f38fb..353435f2 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -298,13 +298,14 @@ module.exports = ( const subID = Math.floor(Math.random() * 1000).toString() const isNotifications = isNotificationsSocket ? 'notifications' : '' logger.info('[LND] New LND Socket created:' + isNotifications + subID) + /* not used by wallet anymore const cancelInvoiceStream = onNewInvoice(socket, subID) const cancelTransactionStream = onNewTransaction(socket, subID) socket.on('disconnect', () => { logger.info('LND socket disconnected:' + isNotifications + subID) cancelInvoiceStream() cancelTransactionStream() - }) + })*/ } }) diff --git a/utils/lightningServices/v2.js b/utils/lightningServices/v2.js index b933c63b..e54e3ee4 100644 --- a/utils/lightningServices/v2.js +++ b/utils/lightningServices/v2.js @@ -571,6 +571,66 @@ const addInvoice = (value, memo = '', confidential = true, expiry = 180) => ) }) +/** + * @typedef {object} lndErr + * @prop {string} reason + * @prop {number} code + * + */ +/** + * @param {(invoice:Common.Schema.InvoiceWhenListed & {r_hash:Buffer,payment_addr:string}) => (boolean | undefined)} dataCb + * @param {(error:lndErr) => void} errorCb + */ +const subscribeInvoices = (dataCb, errorCb) => { + const { lightning } = lightningServices.getServices() + const stream = lightning.subscribeInvoices({}) + stream.on('data', invoice => { + const cancelStream = dataCb(invoice) + if (cancelStream) { + //@ts-expect-error + stream.cancel() + } + }) + stream.on('error', error => { + errorCb(error) + try { + //@ts-expect-error + stream.cancel() + } catch { + logger.info( + '[subscribeInvoices] tried to cancel an already canceled stream' + ) + } + }) +} + +/** + * @param {(tx:Common.Schema.ChainTransaction) => (boolean | undefined)} dataCb + * @param {(error:lndErr) => void} errorCb + */ +const subscribeTransactions = (dataCb, errorCb) => { + const { lightning } = lightningServices.getServices() + const stream = lightning.subscribeTransactions({}) + stream.on('data', transaction => { + const cancelStream = dataCb(transaction) + if (cancelStream) { + //@ts-expect-error + stream.cancel() + } + }) + stream.on('error', error => { + errorCb(error) + try { + //@ts-expect-error + stream.cancel() + } catch { + logger.info( + '[subscribeTransactions] tried to cancel an already canceled stream' + ) + } + }) +} + module.exports = { sendPaymentV2Keysend, sendPaymentV2Invoice, @@ -582,5 +642,7 @@ module.exports = { getChanInfo, listPeers, pendingChannels, - addInvoice + addInvoice, + subscribeInvoices, + subscribeTransactions } diff --git a/yarn.lock b/yarn.lock index daa7f4e4..beb3a3bf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4966,7 +4966,7 @@ nice-try@^1.0.4: resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366" integrity sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ== -node-fetch@^2.3.0: +node-fetch@^2.3.0, node-fetch@^2.6.1: version "2.6.1" resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052" integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw== @@ -6255,10 +6255,10 @@ shellwords@^0.1.1: resolved "https://registry.yarnpkg.com/shellwords/-/shellwords-0.1.1.tgz#d6b9181c1a48d397324c84871efbcfc73fc0654b" integrity sha512-vFwSUfQvqybiICwZY5+DAWIPLKsWO31Q91JSKl3UYv+K5c2QRPzn0qzec6QPu1Qc9eHYItiP3NdJqNVqetYAww== -shock-common@29.1.0: - version "29.1.0" - resolved "https://registry.yarnpkg.com/shock-common/-/shock-common-29.1.0.tgz#3b6d8613fb7c73b8b76c98293a14ec168a9dc888" - integrity sha512-O2tK+TShF3ioAdP4K33MB5QUDTmMqzz+pZe/HnSbi9q1DyX/zQ2Uluzol1NDE/6Z2SSnVFA7/2vJKGaCEdMKoQ== +shock-common@31.1.0: + version "31.1.0" + resolved "https://registry.yarnpkg.com/shock-common/-/shock-common-31.1.0.tgz#9c8f25d0d405a9a9c52849c2d96452c5ddd17267" + integrity sha512-1490v3gTY5ZNEB/Lelfix+6bI4mfFE8hVrtN4ijz0aj/Cl1ZP5ATKdYO+hffReI+4yDaPSAAWd/HYk9b497Kxw== dependencies: immer "^6.0.6" lodash "^4.17.19"