- nostr/ vendors NostrEvent + the nostrclient WebSocket bridge from the events extension, retagged [TASKS] / subscription-id "tasks-*". - nostr_publisher builds kind 31922 with the `event-type: task` tag (per aiolabs/webapp#25 — disambiguates from kind-31922 activities on shared relays), kind 31925 with task-status / occurrence / completed_at, and kind 5 deletions for both. - nostr_hooks bridges task/completion mutations to the publisher and persists the resulting nostr_event_id back onto the local row. - nostr_sync subscribes to {31922, 31925, 5/#k} and filters 31922 client-side on `event-type: task` because most relays don't index custom single-letter tags. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
291 lines
9.8 KiB
Python
291 lines
9.8 KiB
Python
"""
|
|
Bidirectional Nostr sync for the tasks extension.
|
|
|
|
Subscribes to NIP-52 kind 31922 (filtered to `event-type: task` client-side
|
|
because not all relays index our custom tag), kind 31925 completions, and
|
|
kind 5 deletions targeting either. Upserts everything into the local DB
|
|
so federated tasks published by other LNbits instances or webapp clients
|
|
show up in `tasks.tasks` / `tasks.completions`.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
|
|
from loguru import logger
|
|
|
|
from .crud import (
|
|
create_completion,
|
|
db,
|
|
delete_completion,
|
|
delete_task,
|
|
delete_task_completions,
|
|
get_completion_by_id,
|
|
get_task_by_address,
|
|
update_task,
|
|
)
|
|
from .models import (
|
|
CreateTaskCompletion,
|
|
Participant,
|
|
Recurrence,
|
|
Task,
|
|
)
|
|
from .nostr.nostr_client import NostrClient
|
|
|
|
|
|
def _is_task_event(event_data: dict) -> bool:
|
|
"""Filter 31922 events to ones that opt in to the task convention.
|
|
|
|
See aiolabs/webapp#25: kind 31922 is shared with activities so we
|
|
must whitelist `event-type: task` to avoid swallowing community
|
|
activity events."""
|
|
for tag in event_data.get("tags", []):
|
|
if len(tag) >= 2 and tag[0] == "event-type" and tag[1] == "task":
|
|
return True
|
|
return False
|
|
|
|
|
|
async def _handle_task_event(nostr_client: NostrClient, event_data: dict) -> None:
|
|
event_id = event_data.get("id", "")
|
|
if nostr_client.is_duplicate_event(event_id):
|
|
return
|
|
if not _is_task_event(event_data):
|
|
return
|
|
|
|
raw_tags = event_data.get("tags", [])
|
|
single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2}
|
|
tag_lists: dict[str, list[list[str]]] = {}
|
|
for t in raw_tags:
|
|
if len(t) >= 1:
|
|
tag_lists.setdefault(t[0], []).append(t)
|
|
|
|
d_tag = single_tags.get("d")
|
|
if not d_tag:
|
|
return
|
|
start = single_tags.get("start")
|
|
if not start:
|
|
return
|
|
|
|
title = single_tags.get("title", "Untitled Task")
|
|
end = single_tags.get("end")
|
|
description = event_data.get("content", "")
|
|
location = single_tags.get("location")
|
|
status = single_tags.get("status", "pending")
|
|
event_type = single_tags.get("event-type", "task")
|
|
|
|
categories = [t[1] for t in tag_lists.get("t", []) if len(t) >= 2]
|
|
participants: list[Participant] = []
|
|
for t in tag_lists.get("p", []):
|
|
if len(t) < 2:
|
|
continue
|
|
ptype = t[3] if len(t) >= 4 else None
|
|
participants.append(Participant(pubkey=t[1], type=ptype))
|
|
|
|
recurrence = None
|
|
rec_freq = single_tags.get("recurrence")
|
|
if rec_freq in ("daily", "weekly"):
|
|
recurrence = Recurrence(
|
|
frequency=rec_freq,
|
|
day_of_week=single_tags.get("recurrence-day"),
|
|
end_date=single_tags.get("recurrence-end"),
|
|
)
|
|
|
|
incoming_created_at = event_data.get("created_at", 0)
|
|
pubkey = event_data.get("pubkey", "")
|
|
|
|
existing = await get_task_by_address(pubkey, d_tag)
|
|
if existing:
|
|
if (
|
|
existing.nostr_event_created_at
|
|
and incoming_created_at <= existing.nostr_event_created_at
|
|
):
|
|
return
|
|
existing.title = title
|
|
existing.start_date = start
|
|
existing.end_date = end
|
|
existing.description = description
|
|
existing.location = location
|
|
existing.status = status
|
|
existing.event_type = event_type
|
|
existing.participants = participants
|
|
existing.categories = categories
|
|
existing.recurrence = recurrence
|
|
existing.nostr_event_id = event_id
|
|
existing.nostr_event_created_at = incoming_created_at
|
|
await update_task(existing)
|
|
logger.info(f"[TASKS] Updated task from Nostr: {title}")
|
|
return
|
|
|
|
# New task discovered from Nostr — wallet stays empty (we don't own it).
|
|
new_task = Task(
|
|
id=d_tag,
|
|
wallet="",
|
|
pubkey=pubkey,
|
|
d_tag=d_tag,
|
|
title=title,
|
|
start_date=start,
|
|
end_date=end,
|
|
description=description,
|
|
location=location,
|
|
status=status,
|
|
event_type=event_type,
|
|
participants=participants,
|
|
categories=categories,
|
|
recurrence=recurrence,
|
|
nostr_event_id=event_id,
|
|
nostr_event_created_at=incoming_created_at,
|
|
time=datetime.now(timezone.utc),
|
|
)
|
|
try:
|
|
# Bypass db.insert so we can route the JSON-encoded payload through
|
|
# the same serializer crud.create_task uses.
|
|
from .crud import _serialize_task_jsonb
|
|
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO tasks.tasks (
|
|
id, wallet, pubkey, d_tag, title, start_date, end_date,
|
|
description, location, status, event_type, participants,
|
|
categories, recurrence, nostr_event_id,
|
|
nostr_event_created_at, time
|
|
) VALUES (
|
|
:id, :wallet, :pubkey, :d_tag, :title, :start_date,
|
|
:end_date, :description, :location, :status, :event_type,
|
|
:participants, :categories, :recurrence, :nostr_event_id,
|
|
:nostr_event_created_at, :time
|
|
)
|
|
""",
|
|
_serialize_task_jsonb(new_task.dict()),
|
|
)
|
|
logger.info(f"[TASKS] Discovered task from Nostr: {title}")
|
|
except Exception as e:
|
|
logger.debug(f"[TASKS] Skipped duplicate task insert: {e}")
|
|
|
|
|
|
async def _handle_completion_event(
|
|
nostr_client: NostrClient, event_data: dict
|
|
) -> None:
|
|
event_id = event_data.get("id", "")
|
|
if nostr_client.is_duplicate_event(event_id):
|
|
return
|
|
|
|
raw_tags = event_data.get("tags", [])
|
|
single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2}
|
|
|
|
task_address = single_tags.get("a")
|
|
if not task_address or not task_address.startswith("31922:"):
|
|
return
|
|
|
|
task_status = single_tags.get("task-status")
|
|
if task_status not in (
|
|
"claimed", "in-progress", "completed", "blocked", "cancelled"
|
|
):
|
|
# Fall back to NIP-52 `status` semantics if a plain RSVP arrives
|
|
# claiming/accepting the task. Treat anything else as a claim.
|
|
task_status = "claimed"
|
|
|
|
completed_at_str = single_tags.get("completed_at")
|
|
completed_at = int(completed_at_str) if completed_at_str else None
|
|
occurrence = single_tags.get("occurrence")
|
|
|
|
pubkey = event_data.get("pubkey", "")
|
|
incoming_created_at = event_data.get("created_at", 0)
|
|
|
|
# If we already have a newer completion for this triple, skip.
|
|
existing_dup = await get_completion_by_id(event_id)
|
|
if existing_dup:
|
|
return
|
|
|
|
await create_completion(
|
|
pubkey,
|
|
CreateTaskCompletion(
|
|
task_address=task_address,
|
|
task_status=task_status,
|
|
occurrence=occurrence,
|
|
completed_at=completed_at,
|
|
notes=event_data.get("content", ""),
|
|
),
|
|
nostr_event_id=event_id,
|
|
nostr_created_at=incoming_created_at,
|
|
)
|
|
|
|
|
|
async def _handle_deletion_event(event_data: dict) -> None:
|
|
"""Route NIP-09 deletions by their `k` tag to the right resource."""
|
|
raw_tags = event_data.get("tags", [])
|
|
k_tag = next(
|
|
(t[1] for t in raw_tags if len(t) >= 2 and t[0] == "k"),
|
|
None,
|
|
)
|
|
author = event_data.get("pubkey", "")
|
|
|
|
if k_tag == "31922":
|
|
# Delete by address: only honor when the deleter is the task's author.
|
|
for t in raw_tags:
|
|
if len(t) >= 2 and t[0] == "a" and t[1].startswith("31922:"):
|
|
_, addr_pubkey, d_tag = t[1].split(":", 2)
|
|
if addr_pubkey != author:
|
|
continue
|
|
existing = await get_task_by_address(addr_pubkey, d_tag)
|
|
if existing:
|
|
await delete_task(existing.id)
|
|
await delete_task_completions(existing.address)
|
|
logger.info(f"[TASKS] Deleted task from Nostr: {d_tag}")
|
|
|
|
elif k_tag == "31925":
|
|
for t in raw_tags:
|
|
if len(t) >= 2 and t[0] == "e":
|
|
comp = await get_completion_by_id(t[1])
|
|
if comp and comp.pubkey == author:
|
|
await delete_completion(comp.id)
|
|
|
|
|
|
async def process_nostr_message(nostr_client: NostrClient, message: str) -> None:
|
|
try:
|
|
data = json.loads(message)
|
|
except json.JSONDecodeError:
|
|
return
|
|
if not isinstance(data, list) or len(data) < 2:
|
|
return
|
|
|
|
msg_type = data[0]
|
|
if msg_type == "EVENT" and len(data) >= 3:
|
|
event_data = data[2]
|
|
kind = event_data.get("kind")
|
|
if kind == 31922:
|
|
await _handle_task_event(nostr_client, event_data)
|
|
elif kind == 31925:
|
|
await _handle_completion_event(nostr_client, event_data)
|
|
elif kind == 5:
|
|
await _handle_deletion_event(event_data)
|
|
elif msg_type == "EOSE":
|
|
logger.debug("[TASKS] End of stored events from relay")
|
|
elif msg_type == "NOTICE":
|
|
logger.info(f"[TASKS] Relay notice: {data[1]}")
|
|
|
|
|
|
async def wait_for_nostr_events(nostr_client: NostrClient) -> None:
|
|
"""Background task: subscribe and feed inbound events through the
|
|
handlers above. We can't use the `#event-type=task` filter at the
|
|
relay level because most relays don't index custom single-letter
|
|
tags, so we accept all 31922 and filter in _handle_task_event."""
|
|
logger.info("[TASKS] Starting Nostr event sync...")
|
|
|
|
while True:
|
|
try:
|
|
await nostr_client.subscribe(
|
|
[
|
|
{"kinds": [31922]},
|
|
{"kinds": [31925]},
|
|
{"kinds": [5], "#k": ["31922", "31925"]},
|
|
]
|
|
)
|
|
while True:
|
|
message = await nostr_client.get_event()
|
|
await process_nostr_message(nostr_client, message)
|
|
except ValueError:
|
|
logger.warning("[TASKS] Nostr connection lost, resubscribing...")
|
|
await asyncio.sleep(10)
|
|
except Exception as e:
|
|
logger.error(f"[TASKS] Nostr sync error: {e}")
|
|
await asyncio.sleep(30)
|