gabriel / muse public
_query.py python
616 lines 21.3 KB
15cf97e9 feat(bitcoin): add semantic porcelain layer — 19 Bitcoin-idiomatic CLI … Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Bitcoin domain query and analysis functions.
2
3 This module provides purely functional, read-only analytics over Bitcoin state
4 TypedDicts. No I/O, no side effects. Functions here are used by ``diff()``,
5 ``merge()``, and external callers (e.g. a ``muse query`` command) to derive
6 meaning from the raw state.
7
8 All balance values are in satoshis throughout — never floating-point BTC —
9 to avoid rounding errors in financial arithmetic.
10 """
11
12 from __future__ import annotations
13
14 import logging
15
16 from muse.plugins.bitcoin._types import (
17 AddressLabelRecord,
18 AgentStrategyRecord,
19 FeeEstimateRecord,
20 LightningChannelRecord,
21 OraclePriceTickRecord,
22 PendingTxRecord,
23 ScriptType,
24 UTXORecord,
25 )
26
27 logger = logging.getLogger(__name__)
28
29 _SATS_PER_BTC: int = 100_000_000
30
31 # ---------------------------------------------------------------------------
32 # Input weight constants (vbytes per UTXO spent, used for fee estimation)
33 # These are consensus-accurate for the dominant script path of each type.
34 # P2PKH/P2SH are legacy; P2WPKH/P2WSH are segwit v0; P2TR is taproot key-path.
35 # ---------------------------------------------------------------------------
36
37 _INPUT_VBYTES: dict[ScriptType, int] = {
38 "p2pkh": 148, # legacy — largest inputs
39 "p2sh": 91, # P2SH-P2WPKH wrapped segwit
40 "p2wpkh": 41, # native segwit v0 — most common
41 "p2wsh": 105, # native segwit v0 multisig (avg 2-of-3)
42 "p2tr": 58, # taproot key-path spend
43 "op_return": 41, # unspendable — estimate only
44 "unknown": 100, # conservative fallback
45 }
46
47 # Cost to create + eventually spend a P2WPKH change output (in vbytes).
48 # Used by coin selection to compute waste score.
49 _CHANGE_OUTPUT_VBYTES: int = 34 # creating a P2WPKH output
50 _CHANGE_SPEND_VBYTES: int = 41 # spending a P2WPKH output later
51 _CHANGE_COST_VBYTES: int = _CHANGE_OUTPUT_VBYTES + _CHANGE_SPEND_VBYTES # 75
52
53
54 # ---------------------------------------------------------------------------
55 # UTXO analytics
56 # ---------------------------------------------------------------------------
57
58
59 def utxo_key(utxo: UTXORecord) -> str:
60 """Return the canonical identity key for a UTXO: ``"{txid}:{vout}"``."""
61 return f"{utxo['txid']}:{utxo['vout']}"
62
63
64 def total_balance_sat(utxos: list[UTXORecord]) -> int:
65 """Sum all UTXO amounts in satoshis."""
66 return sum(u["amount_sat"] for u in utxos)
67
68
69 def confirmed_balance_sat(utxos: list[UTXORecord], min_confirmations: int = 1) -> int:
70 """Sum UTXO amounts that have at least *min_confirmations* confirmations.
71
72 Immature coinbase outputs (< 100 confirmations) are excluded unless the
73 caller explicitly requests a lower threshold.
74 """
75 result = 0
76 for u in utxos:
77 confs = u["confirmations"]
78 if confs < min_confirmations:
79 continue
80 if u["coinbase"] and confs < 100:
81 continue
82 result += u["amount_sat"]
83 return result
84
85
86 def balance_by_script_type(utxos: list[UTXORecord]) -> dict[str, int]:
87 """Return ``{script_type: total_sats}`` for all UTXOs."""
88 result: dict[str, int] = {}
89 for u in utxos:
90 st = u["script_type"]
91 result[st] = result.get(st, 0) + u["amount_sat"]
92 return result
93
94
95 def coin_age_blocks(utxo: UTXORecord, current_height: int) -> int | None:
96 """Return how many blocks old this UTXO is, or ``None`` if unconfirmed."""
97 bh = utxo["block_height"]
98 if bh is None:
99 return None
100 return max(0, current_height - bh)
101
102
103 def format_sat(amount_sat: int) -> str:
104 """Format satoshis as a human-readable string.
105
106 Values below 100 000 sats are displayed as ``"N sats"``.
107 Values at or above 100 000 sats are displayed as ``"X.XXXXXXXX BTC"``.
108 """
109 if abs(amount_sat) < 100_000:
110 return f"{amount_sat:,} sats"
111 btc = amount_sat / _SATS_PER_BTC
112 return f"{btc:.8f} BTC"
113
114
115 def utxo_summary_line(utxos: list[UTXORecord], current_height: int | None = None) -> str:
116 """One-line human-readable summary of the UTXO set.
117
118 Example: ``"12 UTXOs | 0.34500000 BTC confirmed | 3 unconfirmed"``
119 """
120 total = total_balance_sat(utxos)
121 confirmed = confirmed_balance_sat(utxos)
122 unconfirmed_count = sum(1 for u in utxos if u["confirmations"] == 0)
123 parts = [
124 f"{len(utxos)} UTXOs",
125 f"{format_sat(confirmed)} confirmed",
126 ]
127 if unconfirmed_count:
128 unconf_sat = total - confirmed_balance_sat(utxos, min_confirmations=1)
129 parts.append(f"{unconfirmed_count} unconfirmed ({format_sat(unconf_sat)})")
130 if current_height is not None:
131 oldest = max(
132 (coin_age_blocks(u, current_height) or 0 for u in utxos),
133 default=0,
134 )
135 if oldest:
136 parts.append(f"oldest {oldest} blocks")
137 return " | ".join(parts)
138
139
140 def estimated_input_vbytes(script_type: ScriptType) -> int:
141 """Return the estimated virtual-byte size of spending a UTXO of *script_type*.
142
143 These values are used for fee estimation, effective-value calculation, and
144 dust threshold computation. They are consensus-accurate for the dominant
145 spend path of each script type.
146
147 Args:
148 script_type: One of the recognised Bitcoin script types.
149
150 Returns:
151 Estimated vbyte size of the input witness stack + scriptSig.
152 """
153 return _INPUT_VBYTES.get(script_type, _INPUT_VBYTES["unknown"])
154
155
156 def effective_value_sat(utxo: UTXORecord, fee_rate_sat_vbyte: int) -> int:
157 """Return the net economic value of spending *utxo* at *fee_rate_sat_vbyte*.
158
159 Effective value = ``amount_sat − (input_vbytes × fee_rate)``.
160
161 A positive result means the UTXO is profitable to spend at this fee rate.
162 Zero or negative means it costs more to spend the coin than the coin is
163 worth — it is economically dust and should be excluded from coin selection.
164
165 Args:
166 utxo: The UTXO to evaluate.
167 fee_rate_sat_vbyte: Current fee rate in satoshis per virtual byte.
168
169 Returns:
170 Net effective value in satoshis (may be negative for dust UTXOs).
171 """
172 spend_fee = estimated_input_vbytes(utxo["script_type"]) * fee_rate_sat_vbyte
173 return utxo["amount_sat"] - spend_fee
174
175
176 def dust_threshold_sat(script_type: ScriptType, fee_rate_sat_vbyte: int) -> int:
177 """Return the dust threshold for *script_type* at *fee_rate_sat_vbyte*.
178
179 Bitcoin Core defines dust as: ``amount < 3 × fee_to_spend``.
180 A UTXO below this threshold would cost more than 33 % of its value in fees
181 to spend, making it economically irrational to include in a transaction.
182
183 Args:
184 script_type: Script type of the UTXO to evaluate.
185 fee_rate_sat_vbyte: Current fee rate in satoshis per virtual byte.
186
187 Returns:
188 Minimum amount in satoshis for a non-dust UTXO of this script type.
189 """
190 spend_fee = estimated_input_vbytes(script_type) * fee_rate_sat_vbyte
191 return 3 * spend_fee
192
193
194 def is_dust(utxo: UTXORecord, fee_rate_sat_vbyte: int) -> bool:
195 """Return ``True`` if *utxo* is economically dust at *fee_rate_sat_vbyte*.
196
197 A UTXO is dust when its amount is less than the dust threshold for its
198 script type. Dust UTXOs increase transaction size without contributing
199 meaningful value and should be excluded from coin selection.
200
201 Args:
202 utxo: The UTXO to evaluate.
203 fee_rate_sat_vbyte: Current fee rate in satoshis per virtual byte.
204
205 Returns:
206 ``True`` when the UTXO amount is below the dust threshold.
207 """
208 return utxo["amount_sat"] < dust_threshold_sat(utxo["script_type"], fee_rate_sat_vbyte)
209
210
211 # ---------------------------------------------------------------------------
212 # Halving mechanics
213 # ---------------------------------------------------------------------------
214
215 _HALVING_INTERVAL: int = 210_000
216 _INITIAL_SUBSIDY_SAT: int = 5_000_000_000 # 50 BTC in satoshis
217 _BLOCKS_PER_YEAR: int = 52_560 # ~6 blocks/hour × 24 × 365
218 _BLOCKS_PER_10_MIN: int = 1 # 1 block ≈ 10 minutes
219
220
221 def halving_epoch(height: int) -> int:
222 """Return which halving epoch *height* belongs to (0 = genesis era)."""
223 return height // _HALVING_INTERVAL
224
225
226 def current_subsidy_sat(height: int) -> int:
227 """Return the block subsidy in satoshis at *height*.
228
229 Uses Bitcoin Core's right-shift formula: ``50 BTC >> epoch``.
230 Returns 0 once the subsidy rounds down past 1 satoshi (epoch ≥ 33).
231 """
232 epoch = halving_epoch(height)
233 if epoch >= 33:
234 return 0
235 return _INITIAL_SUBSIDY_SAT >> epoch
236
237
238 def blocks_until_halving(height: int) -> int:
239 """Return the number of blocks remaining until the next halving."""
240 next_halving_h = (halving_epoch(height) + 1) * _HALVING_INTERVAL
241 return max(0, next_halving_h - height)
242
243
244 def next_halving_height(height: int) -> int:
245 """Return the block height of the next halving event."""
246 return (halving_epoch(height) + 1) * _HALVING_INTERVAL
247
248
249 def estimated_days_until_halving(height: int) -> float:
250 """Rough estimate of calendar days until the next halving.
251
252 Assumes one block every 10 minutes on average.
253 """
254 blocks = blocks_until_halving(height)
255 return blocks * 10 / (60 * 24) # blocks × 10 min / (min/day)
256
257
258 # ---------------------------------------------------------------------------
259 # Whale tier classification
260 # ---------------------------------------------------------------------------
261
262 # Tiers ordered from largest to smallest — first match wins.
263 _WHALE_TIERS: list[tuple[str, int]] = [
264 ("Humpback", 100_000 * _SATS_PER_BTC), # ≥ 100 000 BTC
265 ("Whale", 1_000 * _SATS_PER_BTC), # ≥ 1 000 BTC
266 ("Shark", 100 * _SATS_PER_BTC), # ≥ 100 BTC
267 ("Dolphin", 10 * _SATS_PER_BTC), # ≥ 10 BTC
268 ("Fish", 1 * _SATS_PER_BTC), # ≥ 1 BTC
269 ("Crab", 1_000_000), # ≥ 0.01 BTC
270 ("Shrimp", 100_000), # ≥ 0.001 BTC
271 ("Plankton", 0), # < 0.001 BTC
272 ]
273
274
275 def whale_tier(total_sat: int) -> str:
276 """Return the ecosystem tier label for a wallet holding *total_sat* satoshis.
277
278 Tiers (ordered largest → smallest):
279 Humpback ≥ 100 000 BTC · Whale ≥ 1 000 BTC · Shark ≥ 100 BTC ·
280 Dolphin ≥ 10 BTC · Fish ≥ 1 BTC · Crab ≥ 0.01 BTC ·
281 Shrimp ≥ 0.001 BTC · Plankton < 0.001 BTC
282 """
283 for name, threshold in _WHALE_TIERS:
284 if total_sat >= threshold:
285 return name
286 return "Plankton"
287
288
289 def next_tier_threshold_sat(total_sat: int) -> int | None:
290 """Return the satoshi amount needed to reach the next whale tier, or ``None`` at Humpback."""
291 for name, threshold in _WHALE_TIERS:
292 if total_sat < threshold:
293 return threshold
294 return None # already Humpback
295
296
297 # ---------------------------------------------------------------------------
298 # HODL analytics
299 # ---------------------------------------------------------------------------
300
301
302 def hodl_score(utxos: list[UTXORecord], current_height: int) -> float:
303 """Weighted-average coin age in blocks — the canonical HODL metric.
304
305 Coins weighted by their satoshi value: a 1 BTC coin held for 1 000 blocks
306 contributes more to the score than a 1 000 sat coin held for the same time.
307 Higher score = stronger HODLer.
308
309 Args:
310 utxos: UTXO set to analyse.
311 current_height: Current chain tip height.
312
313 Returns:
314 Weighted-average coin age in blocks. ``0.0`` if no confirmed UTXOs.
315 """
316 total_weighted_age = 0.0
317 total_sats = 0
318 for u in utxos:
319 bh = u["block_height"]
320 if bh is not None and u["confirmations"] >= 1:
321 age = max(0, current_height - bh)
322 total_weighted_age += age * u["amount_sat"]
323 total_sats += u["amount_sat"]
324 if total_sats == 0:
325 return 0.0
326 return total_weighted_age / total_sats
327
328
329 def diamond_hands_sat(utxos: list[UTXORecord], current_height: int) -> int:
330 """Return satoshis held for more than one year (~52 560 blocks).
331
332 These are the "diamond hands" — coins that have never been moved through
333 a complete market cycle. Long-term capital gains territory.
334
335 Args:
336 utxos: UTXO set to analyse.
337 current_height: Current chain tip height.
338
339 Returns:
340 Total satoshis in UTXOs older than one year.
341 """
342 return sum(
343 u["amount_sat"]
344 for u in utxos
345 if u["block_height"] is not None
346 and (current_height - u["block_height"]) >= _BLOCKS_PER_YEAR
347 )
348
349
350 def short_term_sat(utxos: list[UTXORecord], current_height: int) -> int:
351 """Return satoshis in UTXOs younger than one year (short-term capital gains territory)."""
352 return sum(
353 u["amount_sat"]
354 for u in utxos
355 if u["block_height"] is not None
356 and (current_height - u["block_height"]) < _BLOCKS_PER_YEAR
357 and u["confirmations"] >= 1
358 )
359
360
361 def hodl_grade(score: float) -> str:
362 """Return a letter grade for a HODL score (weighted-average coin age in blocks).
363
364 Grade thresholds (1 year ≈ 52 560 blocks):
365 S ≥ 3 years · A ≥ 1 year · B ≥ 6 months · C ≥ 3 months · D ≥ 1 month · F < 1 month
366 """
367 if score >= _BLOCKS_PER_YEAR * 3:
368 return "S"
369 if score >= _BLOCKS_PER_YEAR:
370 return "A"
371 if score >= _BLOCKS_PER_YEAR // 2:
372 return "B"
373 if score >= _BLOCKS_PER_YEAR // 4:
374 return "C"
375 if score >= _BLOCKS_PER_YEAR // 12:
376 return "D"
377 return "F"
378
379
380 # ---------------------------------------------------------------------------
381 # Privacy analytics
382 # ---------------------------------------------------------------------------
383
384
385 def address_reuse_count(utxos: list[UTXORecord]) -> int:
386 """Return the number of addresses that appear more than once in *utxos*.
387
388 Address reuse is the single biggest Bitcoin privacy leak — it links
389 multiple transactions to the same owner. Zero reuse is the target.
390 """
391 from collections import Counter
392 addr_count: Counter[str] = Counter(u["address"] for u in utxos)
393 return sum(1 for count in addr_count.values() if count > 1)
394
395
396 def script_type_diversity(utxos: list[UTXORecord]) -> float:
397 """Return the Shannon entropy of the script type distribution.
398
399 A wallet using only one script type is trivially fingerprinted by chain
400 analysis. Higher entropy (more type diversity) = harder to fingerprint.
401 Maximum entropy for N types = log2(N).
402
403 Returns:
404 Shannon entropy in bits. 0.0 for a single-type or empty wallet.
405 """
406 import math
407 from collections import Counter
408 total = len(utxos)
409 if total == 0:
410 return 0.0
411 counts: Counter[str] = Counter(u["script_type"] for u in utxos)
412 entropy = 0.0
413 for count in counts.values():
414 p = count / total
415 entropy -= p * math.log2(p)
416 return round(entropy, 4)
417
418
419 def taproot_adoption_pct(utxos: list[UTXORecord]) -> float:
420 """Return the fraction of UTXO value (by sats) held in P2TR outputs.
421
422 Taproot (P2TR) provides the best privacy and script flexibility.
423 100 % adoption means all coins are in taproot outputs.
424 """
425 total = total_balance_sat(utxos)
426 if total == 0:
427 return 0.0
428 p2tr = sum(u["amount_sat"] for u in utxos if u["script_type"] == "p2tr")
429 return (p2tr / total) * 100.0
430
431
432 def balance_by_category(
433 utxos: list[UTXORecord],
434 labels: list[AddressLabelRecord],
435 ) -> dict[str, int]:
436 """Return ``{category: total_sats}`` using address label annotations.
437
438 UTXOs whose address has no label are bucketed under ``"unknown"``.
439
440 Args:
441 utxos: UTXO set to categorize.
442 labels: Address label records providing category annotations.
443
444 Returns:
445 Dict mapping category strings to total satoshis in that category.
446 """
447 addr_to_cat: dict[str, str] = {lbl["address"]: lbl["category"] for lbl in labels}
448 result: dict[str, int] = {}
449 for u in utxos:
450 cat = addr_to_cat.get(u["address"], "unknown")
451 result[cat] = result.get(cat, 0) + u["amount_sat"]
452 return result
453
454
455 def double_spend_candidates(
456 base_utxo_keys: set[str],
457 our_spent: set[str],
458 their_spent: set[str],
459 ) -> list[str]:
460 """Detect UTXOs that both branches attempted to spend concurrently.
461
462 A double-spend candidate is a UTXO that:
463 1. Existed in the base state (was a real UTXO at branch point), AND
464 2. Was spent (deleted) on BOTH the ours branch AND the theirs branch.
465
466 This signals a strategy-layer double-spend: two agents independently
467 decided to spend the same coin. On the real blockchain only one can win;
468 MUSE surfaces the conflict before anything touches the mempool.
469
470 Args:
471 base_utxo_keys: UTXO keys (``"{txid}:{vout}"``) present in the common
472 ancestor snapshot.
473 our_spent: UTXO keys deleted on our branch since the ancestor.
474 their_spent: UTXO keys deleted on their branch since the ancestor.
475
476 Returns:
477 Sorted list of UTXO keys that are double-spend candidates.
478 """
479 return sorted(base_utxo_keys & our_spent & their_spent)
480
481
482 # ---------------------------------------------------------------------------
483 # Lightning channel analytics
484 # ---------------------------------------------------------------------------
485
486
487 def channel_liquidity_totals(
488 channels: list[LightningChannelRecord],
489 ) -> tuple[int, int]:
490 """Return ``(total_local_sat, total_remote_sat)`` across all channels."""
491 local = sum(c["local_balance_sat"] for c in channels)
492 remote = sum(c["remote_balance_sat"] for c in channels)
493 return local, remote
494
495
496 def channel_utilization(channel: LightningChannelRecord) -> float:
497 """Local balance as a fraction of usable channel capacity [0.0, 1.0].
498
499 Usable capacity excludes both reserve amounts. Returns 0.0 if the
500 channel has zero usable capacity (fully reserved).
501 """
502 usable = (
503 channel["capacity_sat"]
504 - channel["local_reserve_sat"]
505 - channel["remote_reserve_sat"]
506 )
507 if usable <= 0:
508 return 0.0
509 return channel["local_balance_sat"] / usable
510
511
512 def channel_summary_line(channels: list[LightningChannelRecord]) -> str:
513 """One-line summary of all Lightning channels.
514
515 Example: ``"5 channels | 0.02100000 BTC local | 0.01800000 BTC remote | 3 active"``
516 """
517 active = sum(1 for c in channels if c["is_active"])
518 local, remote = channel_liquidity_totals(channels)
519 return (
520 f"{len(channels)} channels"
521 f" | {format_sat(local)} local"
522 f" | {format_sat(remote)} remote"
523 f" | {active} active"
524 )
525
526
527 # ---------------------------------------------------------------------------
528 # Fee oracle analytics
529 # ---------------------------------------------------------------------------
530
531
532 def fee_surface_str(estimate: FeeEstimateRecord) -> str:
533 """Format a fee estimate as a compact three-target string.
534
535 Example: ``"1blk: 42 | 6blk: 15 | 144blk: 3 sat/vbyte"``
536 """
537 return (
538 f"1blk: {estimate['target_1_block_sat_vbyte']}"
539 f" | 6blk: {estimate['target_6_block_sat_vbyte']}"
540 f" | 144blk: {estimate['target_144_block_sat_vbyte']}"
541 " sat/vbyte"
542 )
543
544
545 def latest_fee_estimate(
546 estimates: list[FeeEstimateRecord],
547 ) -> FeeEstimateRecord | None:
548 """Return the most recent fee estimate by timestamp, or ``None``."""
549 if not estimates:
550 return None
551 return max(estimates, key=lambda e: e["timestamp"])
552
553
554 # ---------------------------------------------------------------------------
555 # Price oracle analytics
556 # ---------------------------------------------------------------------------
557
558
559 def price_at_height(
560 prices: list[OraclePriceTickRecord],
561 height: int,
562 ) -> float | None:
563 """Return the BTC/USD price closest to *height*, or ``None`` if no data."""
564 candidates = [p for p in prices if p["block_height"] is not None]
565 if not candidates:
566 return None
567 closest = min(candidates, key=lambda p: abs((p["block_height"] or 0) - height))
568 return closest["price_usd"]
569
570
571 def latest_price(prices: list[OraclePriceTickRecord]) -> float | None:
572 """Return the most recent BTC/USD price by timestamp, or ``None``."""
573 if not prices:
574 return None
575 return max(prices, key=lambda p: p["timestamp"])["price_usd"]
576
577
578 # ---------------------------------------------------------------------------
579 # Mempool analytics
580 # ---------------------------------------------------------------------------
581
582
583 def mempool_summary_line(mempool: list[PendingTxRecord]) -> str:
584 """One-line summary of the local mempool view.
585
586 Example: ``"7 pending | 0.00150000 BTC | avg 23 sat/vbyte | 3 RBF"``
587 """
588 if not mempool:
589 return "mempool empty"
590 total = sum(t["amount_sat"] for t in mempool)
591 avg_rate = sum(t["fee_rate_sat_vbyte"] for t in mempool) / len(mempool)
592 rbf = sum(1 for t in mempool if t["rbf_eligible"])
593 return (
594 f"{len(mempool)} pending"
595 f" | {format_sat(total)}"
596 f" | avg {avg_rate:.0f} sat/vbyte"
597 f" | {rbf} RBF"
598 )
599
600
601 # ---------------------------------------------------------------------------
602 # Strategy analytics
603 # ---------------------------------------------------------------------------
604
605
606 def strategy_summary_line(strategy: AgentStrategyRecord) -> str:
607 """One-line summary of the active agent strategy."""
608 parts = [f"strategy={strategy['name']!r}"]
609 if strategy["simulation_mode"]:
610 parts.append("SIM")
611 if strategy["dca_amount_sat"] is not None:
612 parts.append(f"DCA={format_sat(strategy['dca_amount_sat'])}")
613 parts.append(f"max_fee={strategy['max_fee_rate_sat_vbyte']} sat/vbyte")
614 return " | ".join(parts)
615
616