diff --git a/lnbits/core/services/__init__.py b/lnbits/core/services/__init__.py index 2fcccfa3..36a23691 100644 --- a/lnbits/core/services/__init__.py +++ b/lnbits/core/services/__init__.py @@ -3,7 +3,7 @@ from .funding_source import ( switch_to_voidwallet, ) from .lnurl import perform_lnurlauth, redeem_lnurl_withdraw -from .notifications import enqueue_notification +from .notifications import enqueue_notification, send_payment_notification from .payments import ( calculate_fiat_amounts, check_transaction_status, @@ -12,7 +12,6 @@ from .payments import ( fee_reserve, fee_reserve_total, pay_invoice, - send_payment_notification, service_fee, update_pending_payments, update_wallet_balance, @@ -40,6 +39,7 @@ __all__ = [ "perform_lnurlauth", # notifications "enqueue_notification", + "send_payment_notification", # payments "calculate_fiat_amounts", "check_transaction_status", @@ -48,7 +48,6 @@ __all__ = [ "fee_reserve", "fee_reserve_total", "pay_invoice", - "send_payment_notification", "service_fee", "update_pending_payments", "update_wallet_balance", diff --git a/lnbits/core/services/notifications.py b/lnbits/core/services/notifications.py index 3accd6e5..53bd133a 100644 --- a/lnbits/core/services/notifications.py +++ b/lnbits/core/services/notifications.py @@ -1,15 +1,27 @@ import asyncio +import json +from http import HTTPStatus from typing import Optional, Tuple import httpx from loguru import logger +from py_vapid import Vapid +from pywebpush import WebPushException, webpush +from lnbits.core.crud import ( + delete_webpush_subscriptions, + get_webpush_subscriptions_for_user, + mark_webhook_sent, +) +from lnbits.core.models import Payment, Wallet from lnbits.core.models.notifications import ( NOTIFICATION_TEMPLATES, NotificationMessage, NotificationType, ) from lnbits.core.services.nostr import fetch_nip5_details, send_nostr_dm +from lnbits.core.services.websockets import websocket_manager +from lnbits.helpers import check_callback_url from lnbits.settings import settings from lnbits.utils.nostr import normalize_private_key @@ -123,3 +135,134 @@ def _notification_message_to_text( text = meesage_value text = f"""[{settings.lnbits_site_title}]\n{text}""" return message_type, text + + +async def dispatch_webhook(payment: Payment): + """ + Dispatches the webhook to the webhook url. + """ + logger.debug("sending webhook", payment.webhook) + + if not payment.webhook: + return await mark_webhook_sent(payment.payment_hash, -1) + + headers = {"User-Agent": settings.user_agent} + async with httpx.AsyncClient(headers=headers) as client: + data = payment.dict() + try: + check_callback_url(payment.webhook) + r = await client.post(payment.webhook, json=data, timeout=40) + r.raise_for_status() + await mark_webhook_sent(payment.payment_hash, r.status_code) + except httpx.HTTPStatusError as exc: + await mark_webhook_sent(payment.payment_hash, exc.response.status_code) + logger.warning( + f"webhook returned a bad status_code: {exc.response.status_code} " + f"while requesting {exc.request.url!r}." + ) + except httpx.RequestError: + await mark_webhook_sent(payment.payment_hash, -1) + logger.warning(f"Could not send webhook to {payment.webhook}") + + +async def send_payment_notification(wallet: Wallet, payment: Payment): + try: + await send_ws_payment_notification(wallet, payment) + except Exception as e: + logger.error("Error sending websocket payment notification", e) + try: + send_chat_payment_notification(wallet, payment) + except Exception as e: + logger.error("Error sending chat payment notification", e) + try: + await send_payment_push_notification(wallet, payment) + except Exception as e: + logger.error("Error sending push payment notification", e) + + if payment.webhook and not payment.webhook_status: + await dispatch_webhook(payment) + + +async def send_ws_payment_notification(wallet: Wallet, payment: Payment): + # TODO: websocket message should be a clean payment model + # await websocket_manager.send_data(payment.json(), wallet.inkey) + # TODO: figure out why we send the balance with the payment here. + # cleaner would be to have a separate message for the balance + # and send it with the id of the wallet so wallets can subscribe to it + payment_notification = json.dumps( + { + "wallet_balance": wallet.balance, + # use pydantic json serialization to get the correct datetime format + "payment": json.loads(payment.json()), + }, + ) + await websocket_manager.send_data(payment_notification, wallet.inkey) + await websocket_manager.send_data(payment_notification, wallet.adminkey) + + await websocket_manager.send_data( + json.dumps({"pending": payment.pending}), payment.payment_hash + ) + + +def send_chat_payment_notification(wallet: Wallet, payment: Payment): + amount_sats = abs(payment.sat) + values: dict = { + "wallet_id": wallet.id, + "wallet_name": wallet.name, + "amount_sats": amount_sats, + "fiat_value_fmt": "", + } + if payment.extra.get("wallet_fiat_currency", None): + amount_fiat = payment.extra.get("wallet_fiat_amount", None) + currency = payment.extra.get("wallet_fiat_currency", None) + values["fiat_value_fmt"] = f"`{amount_fiat}`*{currency}* / " + + if payment.is_out: + if amount_sats >= settings.lnbits_notification_outgoing_payment_amount_sats: + enqueue_notification(NotificationType.outgoing_payment, values) + else: + if amount_sats >= settings.lnbits_notification_incoming_payment_amount_sats: + enqueue_notification(NotificationType.incoming_payment, values) + + +async def send_payment_push_notification(wallet: Wallet, payment: Payment): + subscriptions = await get_webpush_subscriptions_for_user(wallet.user) + + amount = int(payment.amount / 1000) + + title = f"LNbits: {wallet.name}" + body = f"You just received {amount} sat{'s'[:amount^1]}!" + + if payment.memo: + body += f"\r\n{payment.memo}" + + for subscription in subscriptions: + # todo: review permissions when user-id-only not allowed + # todo: replace all this logic with websockets? + url = f"https://{subscription.host}/wallet?usr={wallet.user}&wal={wallet.id}" + await send_push_notification(subscription, title, body, url) + + +async def send_push_notification(subscription, title, body, url=""): + vapid = Vapid() + try: + logger.debug("sending push notification") + webpush( + json.loads(subscription.data), + json.dumps({"title": title, "body": body, "url": url}), + ( + vapid.from_pem(bytes(settings.lnbits_webpush_privkey, "utf-8")) + if settings.lnbits_webpush_privkey + else None + ), + {"aud": "", "sub": "mailto:alan@lnbits.com"}, + ) + except WebPushException as e: + if e.response and e.response.status_code == HTTPStatus.GONE: + # cleanup unsubscribed or expired push subscriptions + await delete_webpush_subscriptions(subscription.endpoint) + else: + logger.error( + f"failed sending push notification: " + f"{e.response.text if e.response else e}" + ) diff --git a/lnbits/core/services/payments.py b/lnbits/core/services/payments.py index 8b7473f2..5187cd66 100644 --- a/lnbits/core/services/payments.py +++ b/lnbits/core/services/payments.py @@ -1,5 +1,4 @@ import asyncio -import json import time from datetime import datetime, timedelta, timezone from typing import Optional @@ -12,8 +11,6 @@ from loguru import logger from lnbits.core.crud.payments import get_daily_stats from lnbits.core.db import db from lnbits.core.models import PaymentDailyStats, PaymentFilters -from lnbits.core.models.notifications import NotificationType -from lnbits.core.services.notifications import enqueue_notification from lnbits.db import Connection, Filters from lnbits.decorators import check_user_extension_access from lnbits.exceptions import InvoiceError, PaymentError @@ -45,7 +42,7 @@ from ..models import ( PaymentState, Wallet, ) -from .websockets import websocket_manager +from .notifications import send_payment_notification async def pay_invoice( @@ -278,60 +275,6 @@ async def update_wallet_balance( await internal_invoice_queue.put(payment.checking_id) -async def send_payment_notification(wallet: Wallet, payment: Payment): - try: - await send_ws_payment_notification(wallet, payment) - except Exception as e: - logger.error("Error sending websocket payment notification", e) - - try: - send_chat_payment_notification(wallet, payment) - except Exception as e: - logger.error("Error sending chat payment notification", e) - - -async def send_ws_payment_notification(wallet: Wallet, payment: Payment): - # TODO: websocket message should be a clean payment model - # await websocket_manager.send_data(payment.json(), wallet.inkey) - # TODO: figure out why we send the balance with the payment here. - # cleaner would be to have a separate message for the balance - # and send it with the id of the wallet so wallets can subscribe to it - payment_notification = json.dumps( - { - "wallet_balance": wallet.balance, - # use pydantic json serialization to get the correct datetime format - "payment": json.loads(payment.json()), - }, - ) - await websocket_manager.send_data(payment_notification, wallet.inkey) - await websocket_manager.send_data(payment_notification, wallet.adminkey) - - await websocket_manager.send_data( - json.dumps({"pending": payment.pending}), payment.payment_hash - ) - - -def send_chat_payment_notification(wallet: Wallet, payment: Payment): - amount_sats = abs(payment.sat) - values: dict = { - "wallet_id": wallet.id, - "wallet_name": wallet.name, - "amount_sats": amount_sats, - "fiat_value_fmt": "", - } - if payment.extra.get("wallet_fiat_currency", None): - amount_fiat = payment.extra.get("wallet_fiat_amount", None) - currency = payment.extra.get("wallet_fiat_currency", None) - values["fiat_value_fmt"] = f"`{amount_fiat}`*{currency}* / " - - if payment.is_out: - if amount_sats >= settings.lnbits_notification_outgoing_payment_amount_sats: - enqueue_notification(NotificationType.outgoing_payment, values) - else: - if amount_sats >= settings.lnbits_notification_incoming_payment_amount_sats: - enqueue_notification(NotificationType.incoming_payment, values) - - async def check_wallet_limits( wallet_id: str, amount_msat: int, conn: Optional[Connection] = None ): diff --git a/lnbits/core/tasks.py b/lnbits/core/tasks.py index 170e3d08..a324e782 100644 --- a/lnbits/core/tasks.py +++ b/lnbits/core/tasks.py @@ -2,25 +2,19 @@ import asyncio import traceback from typing import Callable, Coroutine -import httpx from loguru import logger from lnbits.core.crud import ( create_audit_entry, get_wallet, - get_webpush_subscriptions_for_user, - mark_webhook_sent, ) from lnbits.core.crud.audit import delete_expired_audit_entries from lnbits.core.crud.payments import get_payments_status_count from lnbits.core.crud.users import get_accounts from lnbits.core.crud.wallets import get_wallets_count -from lnbits.core.models import AuditEntry, Payment +from lnbits.core.models import AuditEntry from lnbits.core.models.extensions import InstallableExtension from lnbits.core.models.notifications import NotificationType -from lnbits.core.services import ( - send_payment_notification, -) from lnbits.core.services.funding_source import ( check_balance_delta_changed, check_server_balance_against_node, @@ -29,11 +23,11 @@ from lnbits.core.services.funding_source import ( from lnbits.core.services.notifications import ( enqueue_notification, process_next_notification, + send_payment_notification, ) from lnbits.db import Filters -from lnbits.helpers import check_callback_url from lnbits.settings import settings -from lnbits.tasks import create_unique_task, send_push_notification +from lnbits.tasks import create_unique_task from lnbits.utils.exchange_rates import btc_rates audit_queue: asyncio.Queue = asyncio.Queue() @@ -106,62 +100,6 @@ async def wait_for_paid_invoices(invoice_paid_queue: asyncio.Queue): wallet = await get_wallet(payment.wallet_id) if wallet: await send_payment_notification(wallet, payment) - # dispatch webhook - if payment.webhook and not payment.webhook_status: - await dispatch_webhook(payment) - # dispatch push notification - await send_payment_push_notification(payment) - - -async def dispatch_webhook(payment: Payment): - """ - Dispatches the webhook to the webhook url. - """ - logger.debug("sending webhook", payment.webhook) - - if not payment.webhook: - return await mark_webhook_sent(payment.payment_hash, -1) - - headers = {"User-Agent": settings.user_agent} - async with httpx.AsyncClient(headers=headers) as client: - data = payment.dict() - try: - check_callback_url(payment.webhook) - r = await client.post(payment.webhook, json=data, timeout=40) - r.raise_for_status() - await mark_webhook_sent(payment.payment_hash, r.status_code) - except httpx.HTTPStatusError as exc: - await mark_webhook_sent(payment.payment_hash, exc.response.status_code) - logger.warning( - f"webhook returned a bad status_code: {exc.response.status_code} " - f"while requesting {exc.request.url!r}." - ) - except httpx.RequestError: - await mark_webhook_sent(payment.payment_hash, -1) - logger.warning(f"Could not send webhook to {payment.webhook}") - - -async def send_payment_push_notification(payment: Payment): - wallet = await get_wallet(payment.wallet_id) - - if wallet: - subscriptions = await get_webpush_subscriptions_for_user(wallet.user) - - amount = int(payment.amount / 1000) - - title = f"LNbits: {wallet.name}" - body = f"You just received {amount} sat{'s'[:amount^1]}!" - - if payment.memo: - body += f"\r\n{payment.memo}" - - for subscription in subscriptions: - # todo: review permissions when user-id-only not allowed - # todo: replace all this logic with websockets? - url = ( - f"https://{subscription.host}/wallet?usr={wallet.user}&wal={wallet.id}" - ) - await send_push_notification(subscription, title, body, url) async def wait_for_audit_data(): diff --git a/lnbits/tasks.py b/lnbits/tasks.py index a43657d0..4d84cae1 100644 --- a/lnbits/tasks.py +++ b/lnbits/tasks.py @@ -1,9 +1,7 @@ import asyncio -import json import time import traceback import uuid -from http import HTTPStatus from typing import ( Callable, Coroutine, @@ -13,11 +11,8 @@ from typing import ( ) from loguru import logger -from py_vapid import Vapid -from pywebpush import WebPushException, webpush from lnbits.core.crud import ( - delete_webpush_subscriptions, get_payments, get_standalone_payment, update_payment, @@ -216,28 +211,3 @@ async def invoice_callback_dispatcher(checking_id: str, is_internal: bool = Fals for name, send_chan in invoice_listeners.items(): logger.trace(f"invoice listeners: sending to `{name}`") await send_chan.put(payment) - - -async def send_push_notification(subscription, title, body, url=""): - vapid = Vapid() - try: - logger.debug("sending push notification") - webpush( - json.loads(subscription.data), - json.dumps({"title": title, "body": body, "url": url}), - ( - vapid.from_pem(bytes(settings.lnbits_webpush_privkey, "utf-8")) - if settings.lnbits_webpush_privkey - else None - ), - {"aud": "", "sub": "mailto:alan@lnbits.com"}, - ) - except WebPushException as e: - if e.response and e.response.status_code == HTTPStatus.GONE: - # cleanup unsubscribed or expired push subscriptions - await delete_webpush_subscriptions(subscription.endpoint) - else: - logger.error( - f"failed sending push notification: " - f"{e.response.text if e.response else e}" - )