v12.0.0 - initial commit

This commit is contained in:
padreug 2025-12-31 19:04:13 +01:00
commit e2c49ea43c
1145 changed files with 97211 additions and 0 deletions

View file

@ -0,0 +1,63 @@
const _ = require('lodash/fp')
const pgp = require('pg-promise')()
module.exports = { logDispense, logActionById, logAction, logError }
function logDispense(t, tx) {
const baseRec = { error: tx.error, error_code: tx.errorCode }
const rec = _.merge(mapDispense(tx), baseRec)
const action = _.isEmpty(tx.error) ? 'dispense' : 'dispenseError'
return logAction(t, action, rec, tx)
}
function logActionById(t, action, _rec, txId) {
const rec = _.assign(_rec, { action, tx_id: txId, redeem: false })
const sql = pgp.helpers.insert(rec, null, 'cash_out_actions')
return t.none(sql)
}
function logAction(t, action, _rec, tx) {
const rec = _.assign(_rec, {
action,
tx_id: tx.id,
redeem: !!tx.redeem,
device_id: tx.deviceId,
})
const sql = pgp.helpers.insert(rec, null, 'cash_out_actions')
return t.none(sql).then(_.constant(tx))
}
function logError(t, action, err, tx) {
return logAction(
t,
action,
{
error: err.message,
error_code: err.name,
},
tx,
)
}
function mapDispense(tx) {
const bills = tx.bills
if (_.isEmpty(bills)) return {}
const res = {}
_.forEach(
it => {
const suffix = _.snakeCase(bills[it].name.replace(/cassette/gi, ''))
res[`provisioned_${suffix}`] = bills[it].provisioned
res[`denomination_${suffix}`] = bills[it].denomination
res[`dispensed_${suffix}`] = bills[it].dispensed
res[`rejected_${suffix}`] = bills[it].rejected
},
_.times(_.identity(), _.size(bills)),
)
return res
}

View file

@ -0,0 +1,220 @@
const _ = require('lodash/fp')
const pgp = require('pg-promise')()
const db = require('../db')
const E = require('../error')
const logger = require('../logger')
const helper = require('./cash-out-helper')
const cashOutActions = require('./cash-out-actions')
const cashOutLow = require('./cash-out-low')
const toObj = helper.toObj
module.exports = { atomic }
function atomic(tx, pi, fromClient) {
const TransactionMode = pgp.txMode.TransactionMode
const isolationLevel = pgp.txMode.isolationLevel
const mode = new TransactionMode({ tiLevel: isolationLevel.serializable })
function transaction(t) {
const sql = 'SELECT * FROM cash_out_txs WHERE id=$1 FOR UPDATE'
return t
.oneOrNone(sql, [tx.id])
.then(toObj)
.then(oldTx => {
const isStale = fromClient && oldTx && oldTx.txVersion >= tx.txVersion
if (isStale) throw new E.StaleTxError({ txId: tx.id })
// Server doesn't bump version, so we just prevent from version being older.
const isStaleFromServer =
!fromClient && oldTx && oldTx.txVersion > tx.txVersion
if (isStaleFromServer)
throw new Error('Stale Error: server triggered', tx.id)
return preProcess(t, oldTx, tx, pi).then(preProcessedTx =>
cashOutLow.upsert(t, oldTx, preProcessedTx),
)
})
}
return db.tx({ mode }, transaction)
}
function preProcess(t, oldTx, newTx, pi) {
if (!oldTx) {
return pi
.isHd(newTx)
.then(isHd => nextHd(t, isHd, newTx))
.then(newTxHd => {
return pi.newAddress(newTxHd).then(_.merge(newTxHd))
})
.then(addressedTx => {
const rec = {
to_address: addressedTx.toAddress,
layer_2_address: addressedTx.layer2Address,
}
return cashOutActions.logAction(t, 'provisionAddress', rec, addressedTx)
})
.catch(err => {
pi.notifyOperator(newTx, {
isRedemption: false,
error: 'Error while provisioning address',
}).catch(err =>
logger.error('Failure sending transaction notification', err),
)
return cashOutActions
.logError(t, 'provisionAddress', err, newTx)
.then(() => {
throw err
})
})
}
return Promise.resolve(updateStatus(oldTx, newTx)).then(updatedTx => {
if (updatedTx.status !== oldTx.status) {
const isZeroConf = pi.isZeroConf(updatedTx)
updatedTx.justAuthorized = wasJustAuthorized(oldTx, updatedTx, isZeroConf)
const rec = {
to_address: updatedTx.toAddress,
tx_hash: updatedTx.txHash,
}
return cashOutActions.logAction(t, updatedTx.status, rec, updatedTx)
}
const hasError = !oldTx.error && newTx.error
const hasDispenseOccurred =
!oldTx.dispenseConfirmed && dispenseOccurred(newTx.bills)
if (hasError || hasDispenseOccurred) {
return cashOutActions
.logDispense(t, updatedTx)
.then(it => updateCassettes(t, updatedTx).then(() => it))
.then(t => {
pi.notifyOperator(updatedTx, { isRedemption: true }).catch(err =>
logger.error('Failure sending transaction notification', err),
)
return t
})
}
if (!oldTx.phone && newTx.phone) {
return cashOutActions.logAction(t, 'addPhone', {}, updatedTx)
}
if (!oldTx.redeem && newTx.redeem) {
return cashOutActions.logAction(t, 'redeemLater', {}, updatedTx)
}
return updatedTx
})
}
function nextHd(t, isHd, tx) {
if (!isHd) return Promise.resolve(tx)
return t
.one("select nextval('hd_indices_seq') as hd_index")
.then(row => _.set('hdIndex', row.hd_index, tx))
}
function updateCassettes(t, tx) {
if (!dispenseOccurred(tx.bills)) return Promise.resolve()
const billsStmt = _.join(', ')(
_.map(it => `${tx.bills[it].name} = ${tx.bills[it].name} - $${it + 1}`)(
_.range(0, _.size(tx.bills)),
),
)
const returnStmt = _.join(', ')(_.map(bill => `${bill.name}`)(tx.bills))
const sql = `UPDATE devices SET ${billsStmt} WHERE device_id = $${_.size(tx.bills) + 1} RETURNING ${returnStmt}`
const values = []
_.forEach(
it => values.push(tx.bills[it].dispensed + tx.bills[it].rejected),
_.times(_.identity(), _.size(tx.bills)),
)
values.push(tx.deviceId)
return t.one(sql, values)
}
function wasJustAuthorized(oldTx, newTx, isZeroConf) {
const isAuthorized = () =>
_.includes(oldTx.status, ['notSeen', 'published', 'rejected']) &&
_.includes(newTx.status, ['authorized', 'instant', 'confirmed'])
const isConfirmed = () =>
_.includes(oldTx.status, [
'notSeen',
'published',
'authorized',
'rejected',
]) && _.includes(newTx.status, ['instant', 'confirmed'])
return isZeroConf ? isAuthorized() : isConfirmed()
}
function isPublished(status) {
return _.includes(status, [
'published',
'rejected',
'authorized',
'instant',
'confirmed',
])
}
function isConfirmed(status) {
return status === 'confirmed'
}
function updateStatus(oldTx, newTx) {
const oldStatus = oldTx.status
const newStatus = ratchetStatus(oldStatus, newTx.status)
const publishedAt =
!oldTx.publishedAt && isPublished(newStatus) ? 'now()^' : undefined
const confirmedAt =
!oldTx.confirmedAt && isConfirmed(newStatus) ? 'now()^' : undefined
const updateRec = {
publishedAt,
confirmedAt,
status: newStatus,
}
return _.merge(newTx, updateRec)
}
function ratchetStatus(oldStatus, newStatus) {
const statusOrder = [
'notSeen',
'published',
'rejected',
'authorized',
'instant',
'confirmed',
]
if (oldStatus === newStatus) return oldStatus
if (newStatus === 'insufficientFunds') return newStatus
const idx = Math.max(
statusOrder.indexOf(oldStatus),
statusOrder.indexOf(newStatus),
)
return statusOrder[idx]
}
function dispenseOccurred(bills) {
if (_.isEmpty(bills)) return false
return _.every(_.overEvery([_.has('dispensed'), _.has('rejected')]), bills)
}

View file

@ -0,0 +1,197 @@
const _ = require('lodash/fp')
const db = require('../db')
const T = require('../time')
const BN = require('../bn')
// FP operations on Postgres result in very big errors.
// E.g.: 1853.013808 * 1000 = 1866149.494
const REDEEMABLE_AGE = T.day / 1000
const CASH_OUT_TRANSACTION_STATES = `
case
when error = 'Operator cancel' then 'Cancelled'
when error is not null then 'Error'
when dispense then 'Success'
when (extract(epoch from (now() - greatest(created, confirmed_at))) * 1000) >= ${REDEEMABLE_AGE} then 'Expired'
else 'Pending'
end`
const MAX_CASSETTES = 4
const MAX_RECYCLERS = 6
const SNAKE_CASE_BILL_FIELDS = [
'denomination_1',
'denomination_2',
'denomination_3',
'denomination_4',
'denomination_recycler_1',
'denomination_recycler_2',
'denomination_recycler_3',
'denomination_recycler_4',
'denomination_recycler_5',
'denomination_recycler_6',
'provisioned_1',
'provisioned_2',
'provisioned_3',
'provisioned_4',
'provisioned_recycler_1',
'provisioned_recycler_2',
'provisioned_recycler_3',
'provisioned_recycler_4',
'provisioned_recycler_5',
'provisioned_recycler_6',
]
const BILL_FIELDS = _.map(_.camelCase, SNAKE_CASE_BILL_FIELDS)
module.exports = {
redeemableTxs,
toObj,
toDb,
REDEEMABLE_AGE,
CASH_OUT_TRANSACTION_STATES,
}
const mapValuesWithKey = _.mapValues.convert({ cap: false })
function convertBigNumFields(obj) {
const convert = (value, key) => {
if (
_.includes(key, [
'cryptoAtoms',
'receivedCryptoAtoms',
'fiat',
'fixedFee',
])
) {
// BACKWARDS_COMPATIBILITY 10.1
// bills before 10.2 don't have fixedFee
if (key === 'fixedFee' && !value) return new BN(0).toString()
return value.toString()
}
// Only test isNil for these fields since the others should not be empty.
if (
_.includes(key, ['commissionPercentage', 'rawTickerPrice']) &&
!_.isNil(value)
) {
return value.toString()
}
return value
}
const convertKey = key =>
_.includes(key, ['cryptoAtoms', 'fiat']) ? key + '#' : key
return _.mapKeys(convertKey, mapValuesWithKey(convert, obj))
}
function convertField(key) {
return _.snakeCase(key)
}
function addDbBills(tx) {
const bills = tx.bills
if (_.isEmpty(bills)) return tx
const billsObj = _.flow(
_.reduce((acc, value) => {
const suffix = _.snakeCase(value.name.replace(/cassette/gi, ''))
return {
...acc,
[`provisioned_${suffix}`]: value.provisioned,
[`denomination_${suffix}`]: value.denomination,
}
}, {}),
it => {
const missingKeys = _.reduce((acc, value) => {
return _.assign({ [value]: 0 })(acc)
}, {})(_.difference(SNAKE_CASE_BILL_FIELDS, _.keys(it)))
return _.assign(missingKeys, it)
},
)(bills)
return _.assign(tx, billsObj)
}
function toDb(tx) {
const massager = _.flow(
convertBigNumFields,
addDbBills,
_.omit(['direction', 'bills', 'promoCodeApplied']),
_.mapKeys(convertField),
)
return massager(tx)
}
function toObj(row) {
if (!row) return null
const keys = _.keys(row)
let newObj = {}
keys.forEach(key => {
const objKey = _.camelCase(key)
if (key === 'received_crypto_atoms' && row[key]) {
newObj[objKey] = new BN(row[key])
return
}
if (
_.includes(key, [
'crypto_atoms',
'fiat',
'commission_percentage',
'raw_ticker_price',
])
) {
newObj[objKey] = new BN(row[key])
return
}
newObj[objKey] = row[key]
})
newObj.direction = 'cashOut'
if (_.every(_.isNil, _.at(BILL_FIELDS, newObj))) return newObj
if (_.some(_.isNil, _.at(BILL_FIELDS, newObj)))
throw new Error('Missing cassette values')
const billFieldsArr = _.concat(
_.map(it => ({
name: `cassette${it + 1}`,
denomination: newObj[`denomination${it + 1}`],
provisioned: newObj[`provisioned${it + 1}`],
}))(_.range(0, MAX_CASSETTES)),
_.map(it => ({
name: `recycler${it + 1}`,
denomination: newObj[`denominationRecycler${it + 1}`],
provisioned: newObj[`provisionedRecycler${it + 1}`],
}))(_.range(0, MAX_RECYCLERS)),
)
// There can't be bills with denomination === 0.
// If a bill has denomination === 0, then that cassette is not set and should be filtered out.
const bills = _.filter(it => it.denomination > 0, billFieldsArr)
return _.set('bills', bills, _.omit(BILL_FIELDS, newObj))
}
function redeemableTxs(deviceId) {
const sql = `select * from cash_out_txs
where device_id=$1
and redeem=$2
and dispense=$3
and (
provisioned_1 is not null or provisioned_2 is not null or provisioned_3 is not null or provisioned_4 is not null or
provisioned_recycler_1 is not null or provisioned_recycler_2 is not null or
provisioned_recycler_3 is not null or provisioned_recycler_4 is not null or
provisioned_recycler_5 is not null or provisioned_recycler_6 is not null
)
and extract(epoch from (now() - greatest(created, confirmed_at))) < $4`
return db.any(sql, [deviceId, true, false, REDEEMABLE_AGE]).then(_.map(toObj))
}

View file

@ -0,0 +1,98 @@
const _ = require('lodash/fp')
const pgp = require('pg-promise')()
const helper = require('./cash-out-helper')
const { anonymousCustomer } = require('../constants')
const toDb = helper.toDb
const toObj = helper.toObj
const UPDATEABLE_FIELDS = [
'txHash',
'txVersion',
'status',
'dispense',
'dispenseConfirmed',
'notified',
'redeem',
'phone',
'error',
'swept',
'publishedAt',
'confirmedAt',
'errorCode',
'receivedCryptoAtoms',
'walletScore',
'customerId',
]
module.exports = { upsert, update, insert }
function upsert(t, oldTx, tx) {
if (!oldTx) {
return insert(t, tx).then(newTx => [oldTx, newTx])
}
return update(t, tx, diff(oldTx, tx)).then(newTx => [
oldTx,
newTx,
tx.justAuthorized,
])
}
function insert(t, tx) {
const dbTx = toDb(tx)
const sql = pgp.helpers.insert(dbTx, null, 'cash_out_txs') + ' returning *'
return t.one(sql).then(toObj)
}
function update(t, tx, changes) {
if (_.isEmpty(changes)) return Promise.resolve(tx)
const dbChanges = toDb(changes)
const sql =
pgp.helpers.update(dbChanges, null, 'cash_out_txs') +
pgp.as.format(' where id=$1', [tx.id])
const newTx = _.merge(tx, changes)
return t.none(sql).then(() => newTx)
}
function diff(oldTx, newTx) {
let updatedTx = {}
UPDATEABLE_FIELDS.forEach(fieldKey => {
if (oldTx && _.isEqualWith(nilEqual, oldTx[fieldKey], newTx[fieldKey]))
return
// We never null out an existing field
if (oldTx && _.isNil(newTx[fieldKey]))
return (updatedTx[fieldKey] = oldTx[fieldKey])
switch (fieldKey) {
case 'customerId':
if (oldTx.customerId === anonymousCustomer.uuid) {
return (updatedTx['customerId'] = newTx['customerId'])
}
return
// prevent dispense changing from 'true' to 'false'
case 'dispense':
if (!oldTx.dispense) {
return (updatedTx[fieldKey] = newTx[fieldKey])
}
return
default:
return (updatedTx[fieldKey] = newTx[fieldKey])
}
})
return updatedTx
}
function nilEqual(a, b) {
if (_.isNil(a) && _.isNil(b)) return true
return undefined
}

View file

@ -0,0 +1,242 @@
const _ = require('lodash/fp')
const pgp = require('pg-promise')()
const pEachSeries = require('p-each-series')
const db = require('../db')
const dbErrorCodes = require('../db-error-codes')
const billMath = require('../bill-math')
const T = require('../time')
const logger = require('../logger')
const plugins = require('../plugins')
const httpError = require('../route-helpers').httpError
const helper = require('./cash-out-helper')
const cashOutAtomic = require('./cash-out-atomic')
const cashOutActions = require('./cash-out-actions')
const cashOutLow = require('./cash-out-low')
module.exports = {
post,
monitorLiveIncoming,
monitorStaleIncoming,
monitorUnnotified,
cancel,
}
const STALE_INCOMING_TX_AGE = T.day
const STALE_LIVE_INCOMING_TX_AGE = 10 * T.minutes
const MAX_NOTIFY_AGE = T.day
const MIN_NOTIFY_AGE = 5 * T.minutes
const INSUFFICIENT_FUNDS_CODE = 570
const toObj = helper.toObj
function selfPost(tx, pi) {
return post(tx, pi, false)
}
function post(tx, pi, fromClient = true) {
logger.silly('Updating cashout -- tx:', JSON.stringify(tx))
logger.silly('Updating cashout -- fromClient:', JSON.stringify(fromClient))
return cashOutAtomic.atomic(tx, pi, fromClient).then(txVector => {
const [, newTx, justAuthorized] = txVector
return postProcess(txVector, justAuthorized, pi).then(changes =>
cashOutLow.update(db, newTx, changes),
)
})
}
function postProcess(txVector, justAuthorized, pi) {
const [oldTx, newTx] = txVector
if (justAuthorized) {
pi.sell(newTx)
pi.notifyOperator(newTx, { isRedemption: false }).catch(err =>
logger.error('Failure sending transaction notification', err),
)
}
if ((newTx.dispense && !oldTx.dispense) || (newTx.redeem && !oldTx.redeem)) {
return pi
.buildAvailableUnits(newTx.id)
.then(units => {
units = _.concat(units.cassettes, units.recyclers)
logger.silly('Computing bills to dispense:', {
txId: newTx.id,
units: units,
fiat: newTx.fiat,
})
const bills = billMath.makeChange(units, newTx.fiat)
logger.silly('Bills to dispense:', JSON.stringify(bills))
if (!bills) throw httpError('Out of bills', INSUFFICIENT_FUNDS_CODE)
return bills
})
.then(bills => {
const rec = {}
_.forEach(
it => {
const suffix = _.snakeCase(bills[it].name.replace(/cassette/gi, ''))
rec[`provisioned_${suffix}`] = bills[it].provisioned
rec[`denomination_${suffix}`] = bills[it].denomination
},
_.times(_.identity(), _.size(bills)),
)
return cashOutActions
.logAction(db, 'provisionNotes', rec, newTx)
.then(_.constant({ bills }))
})
.catch(err => {
pi.notifyOperator(newTx, {
error: err.message,
isRedemption: true,
}).catch(err =>
logger.error('Failure sending transaction notification', err),
)
return cashOutActions
.logError(db, 'provisionNotesError', err, newTx)
.then(() => {
throw err
})
})
}
return Promise.resolve({})
}
function fetchOpenTxs(statuses, fromAge, toAge) {
const sql = `select *
from cash_out_txs
where ((extract(epoch from (now() - created))) * 1000)>$1
and ((extract(epoch from (now() - created))) * 1000)<$2
and status in ($3^)
and error is distinct from 'Operator cancel'`
const statusClause = _.map(pgp.as.text, statuses).join(',')
return db
.any(sql, [fromAge, toAge, statusClause])
.then(rows => rows.map(toObj))
}
function processTxStatus(tx, settings) {
const pi = plugins(settings, tx.deviceId)
return pi
.getStatus(tx)
.then(res =>
_.assign(tx, {
receivedCryptoAtoms: res.receivedCryptoAtoms,
status: res.status,
}),
)
.then(_tx => getWalletScore(_tx, pi))
.then(_tx => selfPost(_tx, pi))
}
function getWalletScore(tx, pi) {
const statuses = ['published', 'authorized', 'confirmed', 'insufficientFunds']
if (!_.includes(tx.status, statuses) || !_.isNil(tx.walletScore)) {
return tx
}
// Transaction shows up on the blockchain, we can request the sender address
return pi.isWalletScoringEnabled(tx).then(isEnabled => {
if (!isEnabled) return tx
return pi
.rateTransaction(tx)
.then(res =>
res.isValid
? _.assign(tx, { walletScore: res.score })
: _.assign(tx, {
walletScore: res.score,
error: 'Chain analysis score is above defined threshold',
errorCode: 'scoreThresholdReached',
dispense: true,
}),
)
.catch(error =>
_.assign(tx, {
walletScore: 10,
error: `Failure getting address score: ${error.message}`,
errorCode: 'walletScoringError',
dispense: true,
}),
)
})
}
function monitorLiveIncoming(settings) {
const statuses = ['notSeen', 'published', 'insufficientFunds']
return monitorIncoming(settings, statuses, 0, STALE_LIVE_INCOMING_TX_AGE)
}
function monitorStaleIncoming(settings) {
const statuses = [
'notSeen',
'published',
'authorized',
'instant',
'rejected',
'insufficientFunds',
]
return monitorIncoming(
settings,
statuses,
STALE_LIVE_INCOMING_TX_AGE,
STALE_INCOMING_TX_AGE,
)
}
function monitorIncoming(settings, statuses, fromAge, toAge) {
return fetchOpenTxs(statuses, fromAge, toAge)
.then(txs => pEachSeries(txs, tx => processTxStatus(tx, settings)))
.catch(err => {
if (err.code === dbErrorCodes.SERIALIZATION_FAILURE) {
logger.warn('Harmless DB conflict, the query will be retried.')
} else {
logger.error(err)
}
})
}
function monitorUnnotified(settings) {
const sql = `select *
from cash_out_txs
where ((extract(epoch from (now() - created))) * 1000)<$1
and notified=$2 and dispense=$3
and phone is not null
and status in ('instant', 'confirmed')
and (redeem=$4 or ((extract(epoch from (now() - created))) * 1000)>$5)`
const notify = tx => plugins(settings, tx.deviceId).notifyConfirmation(tx)
return db
.any(sql, [MAX_NOTIFY_AGE, false, false, true, MIN_NOTIFY_AGE])
.then(rows => _.map(toObj, rows))
.then(txs => Promise.all(txs.map(notify)))
.catch(logger.error)
}
function cancel(txId) {
const updateRec = {
error: 'Operator cancel',
error_code: 'operatorCancel',
dispense: true,
}
return Promise.resolve()
.then(() => {
return (
pgp.helpers.update(updateRec, null, 'cash_out_txs') +
pgp.as.format(' where id=$1', [txId])
)
})
.then(sql => db.result(sql, false))
.then(res => {
if (res.rowCount !== 1) throw new Error('No such tx-id')
})
.then(() => cashOutActions.logActionById(db, 'operatorCompleted', {}, txId))
}