add Nostr publishing and bidirectional sync

- nostr/ vendors NostrEvent + the nostrclient WebSocket bridge from
  the events extension, retagged [TASKS] / subscription-id "tasks-*".
- nostr_publisher builds kind 31922 with the `event-type: task` tag
  (per aiolabs/webapp#25 — disambiguates from kind-31922 activities on
  shared relays), kind 31925 with task-status / occurrence /
  completed_at, and kind 5 deletions for both.
- nostr_hooks bridges task/completion mutations to the publisher and
  persists the resulting nostr_event_id back onto the local row.
- nostr_sync subscribes to {31922, 31925, 5/#k} and filters 31922
  client-side on `event-type: task` because most relays don't index
  custom single-letter tags.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-13 11:38:42 +02:00
commit 24acbe6674
6 changed files with 755 additions and 0 deletions

291
nostr_sync.py Normal file
View file

@ -0,0 +1,291 @@
"""
Bidirectional Nostr sync for the tasks extension.
Subscribes to NIP-52 kind 31922 (filtered to `event-type: task` client-side
because not all relays index our custom tag), kind 31925 completions, and
kind 5 deletions targeting either. Upserts everything into the local DB
so federated tasks published by other LNbits instances or webapp clients
show up in `tasks.tasks` / `tasks.completions`.
"""
import asyncio
import json
from datetime import datetime, timezone
from loguru import logger
from .crud import (
create_completion,
db,
delete_completion,
delete_task,
delete_task_completions,
get_completion_by_id,
get_task_by_address,
update_task,
)
from .models import (
CreateTaskCompletion,
Participant,
Recurrence,
Task,
)
from .nostr.nostr_client import NostrClient
def _is_task_event(event_data: dict) -> bool:
"""Filter 31922 events to ones that opt in to the task convention.
See aiolabs/webapp#25: kind 31922 is shared with activities so we
must whitelist `event-type: task` to avoid swallowing community
activity events."""
for tag in event_data.get("tags", []):
if len(tag) >= 2 and tag[0] == "event-type" and tag[1] == "task":
return True
return False
async def _handle_task_event(nostr_client: NostrClient, event_data: dict) -> None:
event_id = event_data.get("id", "")
if nostr_client.is_duplicate_event(event_id):
return
if not _is_task_event(event_data):
return
raw_tags = event_data.get("tags", [])
single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2}
tag_lists: dict[str, list[list[str]]] = {}
for t in raw_tags:
if len(t) >= 1:
tag_lists.setdefault(t[0], []).append(t)
d_tag = single_tags.get("d")
if not d_tag:
return
start = single_tags.get("start")
if not start:
return
title = single_tags.get("title", "Untitled Task")
end = single_tags.get("end")
description = event_data.get("content", "")
location = single_tags.get("location")
status = single_tags.get("status", "pending")
event_type = single_tags.get("event-type", "task")
categories = [t[1] for t in tag_lists.get("t", []) if len(t) >= 2]
participants: list[Participant] = []
for t in tag_lists.get("p", []):
if len(t) < 2:
continue
ptype = t[3] if len(t) >= 4 else None
participants.append(Participant(pubkey=t[1], type=ptype))
recurrence = None
rec_freq = single_tags.get("recurrence")
if rec_freq in ("daily", "weekly"):
recurrence = Recurrence(
frequency=rec_freq,
day_of_week=single_tags.get("recurrence-day"),
end_date=single_tags.get("recurrence-end"),
)
incoming_created_at = event_data.get("created_at", 0)
pubkey = event_data.get("pubkey", "")
existing = await get_task_by_address(pubkey, d_tag)
if existing:
if (
existing.nostr_event_created_at
and incoming_created_at <= existing.nostr_event_created_at
):
return
existing.title = title
existing.start_date = start
existing.end_date = end
existing.description = description
existing.location = location
existing.status = status
existing.event_type = event_type
existing.participants = participants
existing.categories = categories
existing.recurrence = recurrence
existing.nostr_event_id = event_id
existing.nostr_event_created_at = incoming_created_at
await update_task(existing)
logger.info(f"[TASKS] Updated task from Nostr: {title}")
return
# New task discovered from Nostr — wallet stays empty (we don't own it).
new_task = Task(
id=d_tag,
wallet="",
pubkey=pubkey,
d_tag=d_tag,
title=title,
start_date=start,
end_date=end,
description=description,
location=location,
status=status,
event_type=event_type,
participants=participants,
categories=categories,
recurrence=recurrence,
nostr_event_id=event_id,
nostr_event_created_at=incoming_created_at,
time=datetime.now(timezone.utc),
)
try:
# Bypass db.insert so we can route the JSON-encoded payload through
# the same serializer crud.create_task uses.
from .crud import _serialize_task_jsonb
await db.execute(
"""
INSERT INTO tasks.tasks (
id, wallet, pubkey, d_tag, title, start_date, end_date,
description, location, status, event_type, participants,
categories, recurrence, nostr_event_id,
nostr_event_created_at, time
) VALUES (
:id, :wallet, :pubkey, :d_tag, :title, :start_date,
:end_date, :description, :location, :status, :event_type,
:participants, :categories, :recurrence, :nostr_event_id,
:nostr_event_created_at, :time
)
""",
_serialize_task_jsonb(new_task.dict()),
)
logger.info(f"[TASKS] Discovered task from Nostr: {title}")
except Exception as e:
logger.debug(f"[TASKS] Skipped duplicate task insert: {e}")
async def _handle_completion_event(
nostr_client: NostrClient, event_data: dict
) -> None:
event_id = event_data.get("id", "")
if nostr_client.is_duplicate_event(event_id):
return
raw_tags = event_data.get("tags", [])
single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2}
task_address = single_tags.get("a")
if not task_address or not task_address.startswith("31922:"):
return
task_status = single_tags.get("task-status")
if task_status not in (
"claimed", "in-progress", "completed", "blocked", "cancelled"
):
# Fall back to NIP-52 `status` semantics if a plain RSVP arrives
# claiming/accepting the task. Treat anything else as a claim.
task_status = "claimed"
completed_at_str = single_tags.get("completed_at")
completed_at = int(completed_at_str) if completed_at_str else None
occurrence = single_tags.get("occurrence")
pubkey = event_data.get("pubkey", "")
incoming_created_at = event_data.get("created_at", 0)
# If we already have a newer completion for this triple, skip.
existing_dup = await get_completion_by_id(event_id)
if existing_dup:
return
await create_completion(
pubkey,
CreateTaskCompletion(
task_address=task_address,
task_status=task_status,
occurrence=occurrence,
completed_at=completed_at,
notes=event_data.get("content", ""),
),
nostr_event_id=event_id,
nostr_created_at=incoming_created_at,
)
async def _handle_deletion_event(event_data: dict) -> None:
"""Route NIP-09 deletions by their `k` tag to the right resource."""
raw_tags = event_data.get("tags", [])
k_tag = next(
(t[1] for t in raw_tags if len(t) >= 2 and t[0] == "k"),
None,
)
author = event_data.get("pubkey", "")
if k_tag == "31922":
# Delete by address: only honor when the deleter is the task's author.
for t in raw_tags:
if len(t) >= 2 and t[0] == "a" and t[1].startswith("31922:"):
_, addr_pubkey, d_tag = t[1].split(":", 2)
if addr_pubkey != author:
continue
existing = await get_task_by_address(addr_pubkey, d_tag)
if existing:
await delete_task(existing.id)
await delete_task_completions(existing.address)
logger.info(f"[TASKS] Deleted task from Nostr: {d_tag}")
elif k_tag == "31925":
for t in raw_tags:
if len(t) >= 2 and t[0] == "e":
comp = await get_completion_by_id(t[1])
if comp and comp.pubkey == author:
await delete_completion(comp.id)
async def process_nostr_message(nostr_client: NostrClient, message: str) -> None:
try:
data = json.loads(message)
except json.JSONDecodeError:
return
if not isinstance(data, list) or len(data) < 2:
return
msg_type = data[0]
if msg_type == "EVENT" and len(data) >= 3:
event_data = data[2]
kind = event_data.get("kind")
if kind == 31922:
await _handle_task_event(nostr_client, event_data)
elif kind == 31925:
await _handle_completion_event(nostr_client, event_data)
elif kind == 5:
await _handle_deletion_event(event_data)
elif msg_type == "EOSE":
logger.debug("[TASKS] End of stored events from relay")
elif msg_type == "NOTICE":
logger.info(f"[TASKS] Relay notice: {data[1]}")
async def wait_for_nostr_events(nostr_client: NostrClient) -> None:
"""Background task: subscribe and feed inbound events through the
handlers above. We can't use the `#event-type=task` filter at the
relay level because most relays don't index custom single-letter
tags, so we accept all 31922 and filter in _handle_task_event."""
logger.info("[TASKS] Starting Nostr event sync...")
while True:
try:
await nostr_client.subscribe(
[
{"kinds": [31922]},
{"kinds": [31925]},
{"kinds": [5], "#k": ["31922", "31925"]},
]
)
while True:
message = await nostr_client.get_event()
await process_nostr_message(nostr_client, message)
except ValueError:
logger.warning("[TASKS] Nostr connection lost, resubscribing...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"[TASKS] Nostr sync error: {e}")
await asyncio.sleep(30)