Shared mempool.space cache

This commit is contained in:
ben 2022-11-24 19:27:12 +00:00
parent b4c970a005
commit e6a3c63dd5
5 changed files with 663 additions and 404 deletions

View file

@ -1,10 +1,18 @@
from typing import List, Optional, Union
from lnbits.helpers import urlsafe_short_hash
import time
from . import db
from .models import Gerty
from .models import (
Gerty,
Mempool,
Fees_recommended,
Hashrate_1w,
Hashrate_1m,
Statistics,
Difficulty_adjustment,
Tip_height)
async def create_gerty(wallet_id: str, data: Gerty) -> Gerty:
gerty_id = urlsafe_short_hash()
@ -70,3 +78,128 @@ async def get_gertys(wallet_ids: Union[str, List[str]]) -> List[Gerty]:
async def delete_gerty(gerty_id: str) -> None:
await db.execute("DELETE FROM gerty.gertys WHERE id = ?", (gerty_id,))
#############MEMPOOL###########
async def get_fees_recommended(gerty) -> Optional[Fees_recommended]:
row = await db.fetchone("SELECT * FROM gerty.fees_recommended", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/v1/fees/recommended")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.fees_recommended
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Fees_recommended(**response) if response else None
else:
return Fees_recommended(**row) if row else None
async def get_hashrate_1w(gerty) -> Optional[Hashrate_1w]:
row = await db.fetchone("SELECT * FROM gerty.hashrate_1w", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1w")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.hashrate_1w
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Hashrate_1w(**response) if response else None
else:
return Hashrate_1w(**row) if row else None
async def get_hashrate_1m(gerty) -> Optional[Hashrate_1m]:
row = await db.fetchone("SELECT * FROM gerty.hashrate_1m", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1m")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.hashrate_1m
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Hashrate_1m(**response) if response else None
else:
return Hashrate_1m(**row) if row else None
async def get_statistics(gerty) -> Optional[Statistics]:
row = await db.fetchone("SELECT * FROM gerty.statistics", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/v1/lightning/statistics/latest")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.statistics
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Statistics(**response) if response else None
else:
return Statistics(**row) if row else None
async def get_difficulty_adjustment(gerty) -> Optional[Difficulty_adjustment]:
row = await db.fetchone("SELECT * FROM gerty.difficulty_adjustment", ())
logger.debug(int(time.time()))
logger.debug(row.time)
logger.debug(int(time.time()) - row.time)
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/v1/difficulty-adjustment")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.difficulty_adjustment
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Difficulty_adjustment(**response) if response else None
else:
return Difficulty_adjustment(**row) if row else None
async def get_tip_height() -> Optional[Tip_height]:
row = await db.fetchone("SELECT * FROM gerty.tip_height", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/blocks/tip/height")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.tip_height
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Tip_height(**response) if response else None
else:
return Tip_height(**row) if row else None
async def get_mempool() -> Optional[Mempool]:
row = await db.fetchone("SELECT * FROM gerty.mempool", ())
if int(time.time()) - row.time > 20:
async with httpx.AsyncClient() as client:
response = await client.get(gerty.mempool_endpoint + "/api/mempool")
if response.status_code == 200:
await db.execute(
"""
UPDATE gerty.mempool
SET data = ?, time = ?
""",
(response.json(), int(time.time())),
)
return Mempool(**response) if response else None
else:
return Mempool(**row) if row else None

View file

@ -4,6 +4,16 @@ from datetime import datetime, timedelta
import httpx
from loguru import logger
from .crud import (
get_fees_recommended,
get_hashrate_1w,
get_hashrate_1m,
get_statistics,
get_difficulty_adjustment,
get_tip_height,
get_mempool
)
from .number_prefixer import *
@ -65,19 +75,12 @@ def format_number(number, precision=None):
return "{:,}".format(round(number, precision))
async def get_mempool_recommended_fees(gerty):
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await client.get(gerty.mempool_endpoint + "/api/v1/fees/recommended")
return r.json()
async def get_mining_dashboard(gerty):
areas = []
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
# current hashrate
r = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1w")
r = await get_hashrate_1w(gerty)
data = r.json()
hashrateNow = data["currentHashrate"]
hashrateOneWeekAgo = data["hashrates"][6]["avgHashrate"]
@ -99,9 +102,7 @@ async def get_mining_dashboard(gerty):
)
areas.append(text)
r = await client.get(
gerty.mempool_endpoint + "/api/v1/difficulty-adjustment"
)
r = await get_difficulty_adjustment(gerty)
# timeAvg
text = []
@ -131,7 +132,7 @@ async def get_mining_dashboard(gerty):
)
areas.append(text)
r = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1m")
r = await get_hashrate_1m(gerty)
data = r.json()
stat = {}
stat["current"] = data["currentDifficulty"]
@ -141,19 +142,9 @@ async def get_mining_dashboard(gerty):
return areas
async def api_get_lightning_stats(gerty):
stat = {}
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await client.get(
gerty.mempool_endpoint + "/api/v1/lightning/statistics/latest"
)
data = r.json()
return data
async def get_lightning_stats(gerty):
data = await api_get_lightning_stats(gerty)
data = await get_statistics(gerty)
areas = []
text = []
@ -280,16 +271,385 @@ async def api_get_mining_stat(stat_slug: str, gerty):
stat = ""
if stat_slug == "mining_current_hash_rate":
async with httpx.AsyncClient() as client:
r = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1m")
r = await get_hashrate_1m(gerty)
data = r.json()
stat = {}
stat['current'] = data['currentHashrate']
stat['1w'] = data['hashrates'][len(data['hashrates']) - 7]['avgHashrate']
elif stat_slug == "mining_current_difficulty":
async with httpx.AsyncClient() as client:
r = await client.get(gerty.mempool_endpoint + "/api/v1/mining/hashrate/1m")
r = await get_hashrate_1m(gerty)
data = r.json()
stat = {}
stat['current'] = data['currentDifficulty']
stat['previous'] = data['difficulty'][len(data['difficulty']) - 2]['difficulty']
return stat
return stat
###########################################
# Get a screen slug by its position in the screens_list
def get_screen_slug_by_index(index: int, screens_list):
logger.debug("Index: {0}".format(index))
logger.debug("len(screens_list) - 1: {0} ".format(len(screens_list) - 1))
if index <= len(screens_list) - 1:
return list(screens_list)[index - 1]
else:
return None
# Get a list of text items for the screen number
async def get_screen_data(screen_num: int, screens_list: dict, gerty):
screen_slug = get_screen_slug_by_index(screen_num, screens_list)
# first get the relevant slug from the display_preferences
logger.debug("screen_slug")
logger.debug(screen_slug)
areas = []
title = ""
if screen_slug == "dashboard":
title = gerty.name
areas = await get_dashboard(gerty)
if screen_slug == "lnbits_wallets_balance":
wallets = await get_lnbits_wallet_balances(gerty)
text = []
for wallet in wallets:
text.append(get_text_item_dict(text="{0}'s Wallet".format(wallet['name']), font_size=20,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0} sats".format(format_number(wallet['balance'])), font_size=40,gerty_type=gerty.type))
areas.append(text)
elif screen_slug == "fun_satoshi_quotes":
areas.append(await get_satoshi_quotes(gerty))
elif screen_slug == "fun_exchange_market_rate":
areas.append(await get_exchange_rate(gerty))
elif screen_slug == "onchain_difficulty_epoch_progress":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_block_height":
logger.debug("iam block height")
text = []
text.append(get_text_item_dict(text=format_number(await get_tip_height(gerty)), font_size=80, gerty_type=gerty.type))
areas.append(text)
elif screen_slug == "onchain_difficulty_retarget_date":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_difficulty_blocks_remaining":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_difficulty_epoch_time_remaining":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "dashboard_onchain":
title = "Onchain Data"
areas = await get_onchain_dashboard(gerty)
elif screen_slug == "mempool_recommended_fees":
areas.append(await get_mempool_stat(screen_slug, gerty))
elif screen_slug == "mempool_tx_count":
areas.append(await get_mempool_stat(screen_slug, gerty))
elif screen_slug == "mining_current_hash_rate":
areas.append(await get_mining_stat(screen_slug, gerty))
elif screen_slug == "mining_current_difficulty":
areas.append(await get_mining_stat(screen_slug, gerty))
elif screen_slug == "dashboard_mining":
title = "Mining Data"
areas = await get_mining_dashboard(gerty)
elif screen_slug == "lightning_dashboard":
title = "Lightning Network"
areas = await get_lightning_stats(gerty)
data = {}
data["title"] = title
data["areas"] = areas
return data
# Get the dashboard screen
async def get_dashboard(gerty):
areas = []
# XC rate
text = []
amount = await satoshis_amount_as_fiat(100000000, gerty.exchange)
text.append(get_text_item_dict(text=format_number(amount), font_size=40,gerty_type=gerty.type))
text.append(get_text_item_dict(text="BTC{0} price".format(gerty.exchange), font_size=15,gerty_type=gerty.type))
areas.append(text)
# balance
text = []
wallets = await get_lnbits_wallet_balances(gerty)
text = []
for wallet in wallets:
text.append(get_text_item_dict(text="{0}".format(wallet["name"]), font_size=15,gerty_type=gerty.type))
text.append(
get_text_item_dict(text="{0} sats".format(format_number(wallet["balance"])), font_size=20,gerty_type=gerty.type)
)
areas.append(text)
# Mempool fees
text = []
text.append(get_text_item_dict(text=format_number(await get_tip_height(gerty)), font_size=40,gerty_type=gerty.type))
text.append(get_text_item_dict(text="Current block height", font_size=15,gerty_type=gerty.type))
areas.append(text)
# difficulty adjustment time
text = []
text.append(
get_text_item_dict(
text=await get_time_remaining_next_difficulty_adjustment(gerty), font_size=15,gerty_type=gerty.type
)
)
text.append(get_text_item_dict(text="until next difficulty adjustment", font_size=12,gerty_type=gerty.type))
areas.append(text)
return areas
async def get_lnbits_wallet_balances(gerty):
# Get Wallet info
wallets = []
if gerty.lnbits_wallets != "":
for lnbits_wallet in json.loads(gerty.lnbits_wallets):
wallet = await get_wallet_for_key(key=lnbits_wallet)
logger.debug(wallet.name)
if wallet:
wallets.append(
{
"name": wallet.name,
"balance": wallet.balance_msat / 1000,
"inkey": wallet.inkey,
}
)
return wallets
async def get_placeholder_text():
return [
get_text_item_dict(text="Some placeholder text", x_pos=15, y_pos=10, font_size=50,gerty_type=gerty.type),
get_text_item_dict(text="Some placeholder text", x_pos=15, y_pos=10, font_size=50,gerty_type=gerty.type),
]
async def get_satoshi_quotes(gerty):
# Get Satoshi quotes
text = []
quote = await api_gerty_satoshi()
if quote:
if quote["text"]:
text.append(get_text_item_dict(text=quote["text"], font_size=15,gerty_type=gerty.type))
if quote["date"]:
text.append(
get_text_item_dict(text="Satoshi Nakamoto - {0}".format(quote["date"]), font_size=15,gerty_type=gerty.type)
)
return text
# Get Exchange Value
async def get_exchange_rate(gerty):
text = []
if gerty.exchange != "":
try:
amount = await satoshis_amount_as_fiat(100000000, gerty.exchange)
if amount:
price = format_number(amount)
text.append(
get_text_item_dict(
text="Current {0}/BTC price".format(gerty.exchange), font_size=15,gerty_type=gerty.type
)
)
text.append(get_text_item_dict(text=price, font_size=80,gerty_type=gerty.type))
except:
pass
return text
async def get_onchain_stat(stat_slug: str, gerty):
text = []
if (
stat_slug == "onchain_difficulty_epoch_progress" or
stat_slug == "onchain_difficulty_retarget_date" or
stat_slug == "onchain_difficulty_blocks_remaining" or
stat_slug == "onchain_difficulty_epoch_time_remaining"
):
async with httpx.AsyncClient() as client:
r = await get_difficulty_adjustment(gerty)
if stat_slug == "onchain_difficulty_epoch_progress":
stat = round(r.json()['progressPercent'])
text.append(get_text_item_dict(text="Progress through current difficulty epoch", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}%".format(stat), font_size=80,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_retarget_date":
stat = r.json()['estimatedRetargetDate']
dt = datetime.fromtimestamp(stat / 1000).strftime("%e %b %Y at %H:%M")
text.append(get_text_item_dict(text="Date of next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text=dt, font_size=40,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_blocks_remaining":
stat = r.json()['remainingBlocks']
text.append(get_text_item_dict(text="Blocks until next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}".format(format_number(stat)), font_size=80,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_epoch_time_remaining":
stat = r.json()['remainingTime']
text.append(get_text_item_dict(text="Time until next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text=get_time_remaining(stat / 1000, 4), font_size=20,gerty_type=gerty.type))
return text
async def get_onchain_dashboard(gerty):
areas = []
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await get_difficulty_adjustment(gerty)
text = []
stat = round(r.json()["progressPercent"])
text.append(get_text_item_dict(text="Progress through epoch", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}%".format(stat), font_size=60,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["estimatedRetargetDate"]
dt = datetime.fromtimestamp(stat / 1000).strftime("%e %b %Y at %H:%M")
text.append(get_text_item_dict(text="Date of next adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text=dt, font_size=20,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["remainingBlocks"]
text.append(get_text_item_dict(text="Blocks until adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}".format(format_number(stat)), font_size=60,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["remainingTime"]
text.append(get_text_item_dict(text="Time until adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text=get_time_remaining(stat / 1000, 4), font_size=20,gerty_type=gerty.type))
areas.append(text)
return areas
async def get_time_remaining_next_difficulty_adjustment(gerty):
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await get_difficulty_adjustment(gerty)
stat = r.json()["remainingTime"]
time = get_time_remaining(stat / 1000, 3)
return time
async def get_mempool_stat(stat_slug: str, gerty):
text = []
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
if stat_slug == "mempool_tx_count":
r = get_mempool(gerty)
if stat_slug == "mempool_tx_count":
stat = round(r.json()["count"])
text.append(get_text_item_dict(text="Transactions in the mempool", font_size=15,gerty_type=gerty.type))
text.append(
get_text_item_dict(text="{0}".format(format_number(stat)), font_size=80,gerty_type=gerty.type)
)
elif stat_slug == "mempool_recommended_fees":
y_offset = 60
fees = await get_fees_recommended()
pos_y = 80 + y_offset
text.append(get_text_item_dict("mempool.space", 40, 160, pos_y, gerty.type))
pos_y = 180 + y_offset
text.append(get_text_item_dict("Recommended Tx Fees", 20, 240, pos_y, gerty.type))
pos_y = 280 + y_offset
text.append(
get_text_item_dict("{0}".format("None"), 15, 30, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("Low"), 15, 235, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("Medium"), 15, 460, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("High"), 15, 750, pos_y, gerty.type)
)
pos_y = 340 + y_offset
font_size = 15
fee_append = "/vB"
fee_rate = fees["economyFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=30,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["hourFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=235,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["halfHourFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=460,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["fastestFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=750,
y_pos=pos_y,
gerty_type=gerty.type
)
)
return text
def get_date_suffix(dayNumber):
if 4 <= dayNumber <= 20 or 24 <= dayNumber <= 30:
return "th"
else:
return ["st", "nd", "rd"][dayNumber % 10 - 1]
def get_time_remaining(seconds, granularity=2):
intervals = (
# ('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1),
)
result = []
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
result.append("{} {}".format(round(value), name))
return ', '.join(result[:granularity])

View file

@ -28,4 +28,68 @@ async def m003_add_gerty_model_col(db):
"""
support for Gerty model col
"""
await db.execute("ALTER TABLE gerty.gertys ADD COLUMN type TEXT;")
await db.execute("ALTER TABLE gerty.gertys ADD COLUMN type TEXT;")
#########MEMPOOL MIGRATIONS########
async def m004_initial(db):
"""
Initial Gertys table.
"""
await db.execute(
"""
CREATE TABLE gerty.fees_recommended (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.hashrate_1w (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.hashrate_1m (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.statistics (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.difficulty_adjustment (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.tip_height (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)
await db.execute(
"""
CREATE TABLE gerty.mempool (
data TEXT NOT NULL,
time TIMESTAMP
);
"""
)

View file

@ -8,7 +8,6 @@ from pydantic import BaseModel
class Gerty(BaseModel):
id: str = Query(None)
name: str
wallet: str
refresh_time: int = Query(None)
utc_offset: int = Query(None)
type: str
@ -24,3 +23,34 @@ class Gerty(BaseModel):
@classmethod
def from_row(cls, row: Row) -> "Gerty":
return cls(**dict(row))
#########MEMPOOL MODELS###########
class Fees_recommended(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Hashrate_1w(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Hashrate_1m(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Statistics(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Difficulty_adjustment(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Tip_height(BaseModel):
data: str = Query(None)
time: int = Query(None)
class Mempool(BaseModel):
data: str = Query(None)
time: int = Query(None)

View file

@ -22,7 +22,20 @@ from lnbits.utils.exchange_rates import satoshis_amount_as_fiat
from ...settings import LNBITS_PATH
from . import gerty_ext
from .crud import create_gerty, delete_gerty, get_gerty, get_gertys, update_gerty
from .crud import (
create_gerty,
delete_gerty,
get_gerty,
get_gertys,
update_gerty,
get_fees_recommended,
get_hashrate_1w,
get_hashrate_1m,
get_statistics,
get_difficulty_adjustment,
get_tip_height,
get_mempool
)
from .helpers import *
from .models import Gerty
@ -84,9 +97,6 @@ async def api_gerty_delete(
raise HTTPException(status_code=HTTPStatus.NO_CONTENT)
#######################
@gerty_ext.get("/api/v1/gerty/satoshiquote", status_code=HTTPStatus.OK)
async def api_gerty_satoshi():
maxQuoteLength = 186
@ -101,7 +111,7 @@ async def api_gerty_satoshi():
return quote
@gerty_ext.get("/api/v1/gerty/{gerty_id}/{p}")
@gerty_ext.get("/api/v1/gerty/pages/{gerty_id}/{p}")
async def api_gerty_json(gerty_id: str, p: int = None): # page number
gerty = await get_gerty(gerty_id)
@ -150,379 +160,41 @@ async def api_gerty_json(gerty_id: str, p: int = None): # page number
},
}
###########CACHED MEMPOOL##############
# Get a screen slug by its position in the screens_list
def get_screen_slug_by_index(index: int, screens_list):
logger.debug("Index: {0}".format(index))
logger.debug("len(screens_list) - 1: {0} ".format(len(screens_list) - 1))
if index <= len(screens_list) - 1:
return list(screens_list)[index - 1]
else:
return None
@gerty_ext.get("/api/v1/gerty/fees-recommended/{gerty_id}")
async def api_gerty_get_fees_recommended(gerty_id):
logger.debug("gerty_id")
gerty = await get_gerty(gerty_id)
logger.debug(gerty)
return get_fees_recommended(gerty)
@gerty_ext.get("/api/v1/gerty/hashrate-1w/{gerty_id}")
async def api_gerty_get_hashrate_1w(gerty_id):
gerty = await get_gerty(gerty_id)
return get_hashrate_1w(gerty)
# Get a list of text items for the screen number
async def get_screen_data(screen_num: int, screens_list: dict, gerty):
screen_slug = get_screen_slug_by_index(screen_num, screens_list)
# first get the relevant slug from the display_preferences
logger.debug("screen_slug")
logger.debug(screen_slug)
areas = []
title = ""
@gerty_ext.get("/api/v1/gerty/hashrate-1m/{gerty_id}")
async def api_gerty_get_hashrate_1m(gerty_id):
gerty = await get_gerty(gerty_id)
return get_hashrate_1m(gerty)
if screen_slug == "dashboard":
title = gerty.name
areas = await get_dashboard(gerty)
if screen_slug == "lnbits_wallets_balance":
wallets = await get_lnbits_wallet_balances(gerty)
text = []
for wallet in wallets:
text.append(get_text_item_dict(text="{0}'s Wallet".format(wallet['name']), font_size=20,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0} sats".format(format_number(wallet['balance'])), font_size=40,gerty_type=gerty.type))
areas.append(text)
elif screen_slug == "fun_satoshi_quotes":
areas.append(await get_satoshi_quotes(gerty))
elif screen_slug == "fun_exchange_market_rate":
areas.append(await get_exchange_rate(gerty))
elif screen_slug == "onchain_difficulty_epoch_progress":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_block_height":
logger.debug("iam block height")
text = []
text.append(get_text_item_dict(text=format_number(await get_block_height(gerty)), font_size=80, gerty_type=gerty.type))
areas.append(text)
elif screen_slug == "onchain_difficulty_retarget_date":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_difficulty_blocks_remaining":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "onchain_difficulty_epoch_time_remaining":
areas.append(await get_onchain_stat(screen_slug, gerty))
elif screen_slug == "dashboard_onchain":
title = "Onchain Data"
areas = await get_onchain_dashboard(gerty)
elif screen_slug == "mempool_recommended_fees":
areas.append(await get_mempool_stat(screen_slug, gerty))
elif screen_slug == "mempool_tx_count":
areas.append(await get_mempool_stat(screen_slug, gerty))
elif screen_slug == "mining_current_hash_rate":
areas.append(await get_mining_stat(screen_slug, gerty))
elif screen_slug == "mining_current_difficulty":
areas.append(await get_mining_stat(screen_slug, gerty))
elif screen_slug == "dashboard_mining":
title = "Mining Data"
areas = await get_mining_dashboard(gerty)
elif screen_slug == "lightning_dashboard":
title = "Lightning Network"
areas = await get_lightning_stats(gerty)
@gerty_ext.get("/api/v1/gerty/statistics/{gerty_id}")
async def api_gerty_get_statistics(gerty_id):
gerty = await get_gerty(gerty_id)
return get_statistics(gerty)
data = {}
data["title"] = title
data["areas"] = areas
@gerty_ext.get("/api/v1/gerty/difficulty-adjustment/{gerty_id}")
async def api_gerty_get_difficulty_adjustment(gerty_id):
gerty = await get_gerty(gerty_id)
return get_difficulty_adjustment(gerty)
return data
@gerty_ext.get("/api/v1/gerty/tip-height/{gerty_id}")
async def api_gerty_get_tip_height(gerty_id):
gerty = await get_gerty(gerty_id)
return get_tip_height(gerty)
# Get the dashboard screen
async def get_dashboard(gerty):
areas = []
# XC rate
text = []
amount = await satoshis_amount_as_fiat(100000000, gerty.exchange)
text.append(get_text_item_dict(text=format_number(amount), font_size=40,gerty_type=gerty.type))
text.append(get_text_item_dict(text="BTC{0} price".format(gerty.exchange), font_size=15,gerty_type=gerty.type))
areas.append(text)
# balance
text = []
wallets = await get_lnbits_wallet_balances(gerty)
text = []
for wallet in wallets:
text.append(get_text_item_dict(text="{0}".format(wallet["name"]), font_size=15,gerty_type=gerty.type))
text.append(
get_text_item_dict(text="{0} sats".format(format_number(wallet["balance"])), font_size=20,gerty_type=gerty.type)
)
areas.append(text)
# Mempool fees
text = []
text.append(get_text_item_dict(text=format_number(await get_block_height(gerty)), font_size=40,gerty_type=gerty.type))
text.append(get_text_item_dict(text="Current block height", font_size=15,gerty_type=gerty.type))
areas.append(text)
# difficulty adjustment time
text = []
text.append(
get_text_item_dict(
text=await get_time_remaining_next_difficulty_adjustment(gerty), font_size=15,gerty_type=gerty.type
)
)
text.append(get_text_item_dict(text="until next difficulty adjustment", font_size=12,gerty_type=gerty.type))
areas.append(text)
return areas
async def get_lnbits_wallet_balances(gerty):
# Get Wallet info
wallets = []
if gerty.lnbits_wallets != "":
for lnbits_wallet in json.loads(gerty.lnbits_wallets):
wallet = await get_wallet_for_key(key=lnbits_wallet)
logger.debug(wallet.name)
if wallet:
wallets.append(
{
"name": wallet.name,
"balance": wallet.balance_msat / 1000,
"inkey": wallet.inkey,
}
)
return wallets
async def get_placeholder_text():
return [
get_text_item_dict(text="Some placeholder text", x_pos=15, y_pos=10, font_size=50,gerty_type=gerty.type),
get_text_item_dict(text="Some placeholder text", x_pos=15, y_pos=10, font_size=50,gerty_type=gerty.type),
]
async def get_satoshi_quotes(gerty):
# Get Satoshi quotes
text = []
quote = await api_gerty_satoshi()
if quote:
if quote["text"]:
text.append(get_text_item_dict(text=quote["text"], font_size=15,gerty_type=gerty.type))
if quote["date"]:
text.append(
get_text_item_dict(text="Satoshi Nakamoto - {0}".format(quote["date"]), font_size=15,gerty_type=gerty.type)
)
return text
# Get Exchange Value
async def get_exchange_rate(gerty):
text = []
if gerty.exchange != "":
try:
amount = await satoshis_amount_as_fiat(100000000, gerty.exchange)
if amount:
price = format_number(amount)
text.append(
get_text_item_dict(
text="Current {0}/BTC price".format(gerty.exchange), font_size=15,gerty_type=gerty.type
)
)
text.append(get_text_item_dict(text=price, font_size=80,gerty_type=gerty.type))
except:
pass
return text
async def get_onchain_stat(stat_slug: str, gerty):
text = []
if (
stat_slug == "onchain_difficulty_epoch_progress" or
stat_slug == "onchain_difficulty_retarget_date" or
stat_slug == "onchain_difficulty_blocks_remaining" or
stat_slug == "onchain_difficulty_epoch_time_remaining"
):
async with httpx.AsyncClient() as client:
r = await client.get(gerty.mempool_endpoint + "/api/v1/difficulty-adjustment")
if stat_slug == "onchain_difficulty_epoch_progress":
stat = round(r.json()['progressPercent'])
text.append(get_text_item_dict(text="Progress through current difficulty epoch", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}%".format(stat), font_size=80,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_retarget_date":
stat = r.json()['estimatedRetargetDate']
dt = datetime.fromtimestamp(stat / 1000).strftime("%e %b %Y at %H:%M")
text.append(get_text_item_dict(text="Date of next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text=dt, font_size=40,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_blocks_remaining":
stat = r.json()['remainingBlocks']
text.append(get_text_item_dict(text="Blocks until next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}".format(format_number(stat)), font_size=80,gerty_type=gerty.type))
elif stat_slug == "onchain_difficulty_epoch_time_remaining":
stat = r.json()['remainingTime']
text.append(get_text_item_dict(text="Time until next difficulty adjustment", font_size=15,gerty_type=gerty.type))
text.append(get_text_item_dict(text=get_time_remaining(stat / 1000, 4), font_size=20,gerty_type=gerty.type))
return text
async def get_onchain_dashboard(gerty):
areas = []
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await client.get(
gerty.mempool_endpoint + "/api/v1/difficulty-adjustment"
)
text = []
stat = round(r.json()["progressPercent"])
text.append(get_text_item_dict(text="Progress through epoch", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}%".format(stat), font_size=60,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["estimatedRetargetDate"]
dt = datetime.fromtimestamp(stat / 1000).strftime("%e %b %Y at %H:%M")
text.append(get_text_item_dict(text="Date of next adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text=dt, font_size=20,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["remainingBlocks"]
text.append(get_text_item_dict(text="Blocks until adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text="{0}".format(format_number(stat)), font_size=60,gerty_type=gerty.type))
areas.append(text)
text = []
stat = r.json()["remainingTime"]
text.append(get_text_item_dict(text="Time until adjustment", font_size=12,gerty_type=gerty.type))
text.append(get_text_item_dict(text=get_time_remaining(stat / 1000, 4), font_size=20,gerty_type=gerty.type))
areas.append(text)
return areas
async def get_time_remaining_next_difficulty_adjustment(gerty):
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await client.get(
gerty.mempool_endpoint + "/api/v1/difficulty-adjustment"
)
stat = r.json()["remainingTime"]
time = get_time_remaining(stat / 1000, 3)
return time
async def get_block_height(gerty):
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
r = await client.get(gerty.mempool_endpoint + "/api/blocks/tip/height")
return r.json()
async def get_mempool_stat(stat_slug: str, gerty):
text = []
if isinstance(gerty.mempool_endpoint, str):
async with httpx.AsyncClient() as client:
if stat_slug == "mempool_tx_count":
r = await client.get(gerty.mempool_endpoint + "/api/mempool")
if stat_slug == "mempool_tx_count":
stat = round(r.json()["count"])
text.append(get_text_item_dict(text="Transactions in the mempool", font_size=15,gerty_type=gerty.type))
text.append(
get_text_item_dict(text="{0}".format(format_number(stat)), font_size=80,gerty_type=gerty.type)
)
elif stat_slug == "mempool_recommended_fees":
y_offset = 60
fees = await get_mempool_recommended_fees(gerty)
pos_y = 80 + y_offset
text.append(get_text_item_dict("mempool.space", 40, 160, pos_y, gerty.type))
pos_y = 180 + y_offset
text.append(get_text_item_dict("Recommended Tx Fees", 20, 240, pos_y, gerty.type))
pos_y = 280 + y_offset
text.append(
get_text_item_dict("{0}".format("None"), 15, 30, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("Low"), 15, 235, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("Medium"), 15, 460, pos_y, gerty.type)
)
text.append(
get_text_item_dict("{0}".format("High"), 15, 750, pos_y, gerty.type)
)
pos_y = 340 + y_offset
font_size = 15
fee_append = "/vB"
fee_rate = fees["economyFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=30,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["hourFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=235,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["halfHourFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=460,
y_pos=pos_y,
gerty_type=gerty.type
)
)
fee_rate = fees["fastestFee"]
text.append(
get_text_item_dict(
text="{0} {1}{2}".format(
format_number(fee_rate),
("sat" if fee_rate == 1 else "sats"),
fee_append,
),
font_size=font_size,
x_pos=750,
y_pos=pos_y,
gerty_type=gerty.type
)
)
return text
def get_date_suffix(dayNumber):
if 4 <= dayNumber <= 20 or 24 <= dayNumber <= 30:
return "th"
else:
return ["st", "nd", "rd"][dayNumber % 10 - 1]
def get_time_remaining(seconds, granularity=2):
intervals = (
# ('weeks', 604800), # 60 * 60 * 24 * 7
('days', 86400), # 60 * 60 * 24
('hours', 3600), # 60 * 60
('minutes', 60),
('seconds', 1),
)
result = []
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
result.append("{} {}".format(round(value), name))
return ', '.join(result[:granularity])
@gerty_ext.get("/api/v1/gerty/mempool/{gerty_id}")
async def api_gerty_get_mempool(gerty_id):
gerty = await get_gerty(gerty_id)
return get_mempool(gerty)