test_bitcoin_analytics.py
python
| 1 | """Comprehensive tests for the Bitcoin domain semantic porcelain layer. |
| 2 | |
| 3 | Covers every public function in ``_analytics.py`` and the new query helpers |
| 4 | added to ``_query.py``. Tests are organized by capability and assert exact |
| 5 | behaviour with realistic Bitcoin fixtures. |
| 6 | """ |
| 7 | |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import pytest |
| 11 | |
| 12 | from muse.plugins.bitcoin._analytics import ( |
| 13 | ChannelHealthReport, |
| 14 | CoinSelectionResult, |
| 15 | ConsolidationPlan, |
| 16 | FeeRecommendation, |
| 17 | PortfolioPnL, |
| 18 | PortfolioSnapshot, |
| 19 | RebalanceCandidate, |
| 20 | UTXOLifecycle, |
| 21 | WalletSummary, |
| 22 | channel_health_report, |
| 23 | consolidation_plan, |
| 24 | fee_window, |
| 25 | portfolio_pnl, |
| 26 | portfolio_snapshot, |
| 27 | rebalance_candidates, |
| 28 | select_coins, |
| 29 | utxo_lifecycle, |
| 30 | wallet_summary, |
| 31 | ) |
| 32 | from muse.plugins.bitcoin._query import ( |
| 33 | balance_by_category, |
| 34 | dust_threshold_sat, |
| 35 | effective_value_sat, |
| 36 | estimated_input_vbytes, |
| 37 | is_dust, |
| 38 | ) |
| 39 | from muse.plugins.bitcoin._types import ( |
| 40 | AddressLabelRecord, |
| 41 | AgentStrategyRecord, |
| 42 | CoinCategory, |
| 43 | CoinSelectAlgo, |
| 44 | FeeEstimateRecord, |
| 45 | LightningChannelRecord, |
| 46 | OraclePriceTickRecord, |
| 47 | PendingTxRecord, |
| 48 | RoutingPolicyRecord, |
| 49 | ScriptType, |
| 50 | UTXORecord, |
| 51 | ) |
| 52 | |
| 53 | # --------------------------------------------------------------------------- |
| 54 | # Shared fixtures |
| 55 | # --------------------------------------------------------------------------- |
| 56 | |
| 57 | |
| 58 | def _utxo( |
| 59 | txid: str = "a" * 64, |
| 60 | vout: int = 0, |
| 61 | amount_sat: int = 1_000_000, |
| 62 | script_type: ScriptType = "p2wpkh", |
| 63 | address: str = "bc1qtest", |
| 64 | confirmations: int = 6, |
| 65 | block_height: int | None = 850_000, |
| 66 | coinbase: bool = False, |
| 67 | label: str | None = None, |
| 68 | ) -> UTXORecord: |
| 69 | return UTXORecord( |
| 70 | txid=txid, |
| 71 | vout=vout, |
| 72 | amount_sat=amount_sat, |
| 73 | script_type=script_type, |
| 74 | address=address, |
| 75 | confirmations=confirmations, |
| 76 | block_height=block_height, |
| 77 | coinbase=coinbase, |
| 78 | label=label, |
| 79 | ) |
| 80 | |
| 81 | |
| 82 | def _channel( |
| 83 | channel_id: str = "850000x1x0", |
| 84 | capacity_sat: int = 2_000_000, |
| 85 | local_balance_sat: int = 1_000_000, |
| 86 | remote_balance_sat: int = 900_000, |
| 87 | is_active: bool = True, |
| 88 | local_reserve_sat: int = 20_000, |
| 89 | remote_reserve_sat: int = 20_000, |
| 90 | htlc_count: int = 0, |
| 91 | peer_alias: str | None = "ACINQ", |
| 92 | ) -> LightningChannelRecord: |
| 93 | return LightningChannelRecord( |
| 94 | channel_id=channel_id, |
| 95 | peer_pubkey="0279" + "aa" * 32, |
| 96 | peer_alias=peer_alias, |
| 97 | capacity_sat=capacity_sat, |
| 98 | local_balance_sat=local_balance_sat, |
| 99 | remote_balance_sat=remote_balance_sat, |
| 100 | is_active=is_active, |
| 101 | is_public=True, |
| 102 | local_reserve_sat=local_reserve_sat, |
| 103 | remote_reserve_sat=remote_reserve_sat, |
| 104 | unsettled_balance_sat=0, |
| 105 | htlc_count=htlc_count, |
| 106 | ) |
| 107 | |
| 108 | |
| 109 | def _label( |
| 110 | address: str = "bc1qtest", |
| 111 | label: str = "cold storage", |
| 112 | category: CoinCategory = "income", |
| 113 | ) -> AddressLabelRecord: |
| 114 | return AddressLabelRecord( |
| 115 | address=address, |
| 116 | label=label, |
| 117 | category=category, |
| 118 | created_at=1_700_000_000, |
| 119 | ) |
| 120 | |
| 121 | |
| 122 | def _strategy( |
| 123 | name: str = "conservative", |
| 124 | max_fee: int = 10, |
| 125 | rebalance_threshold: float = 0.2, |
| 126 | simulation_mode: bool = False, |
| 127 | dca_amount_sat: int | None = 500_000, |
| 128 | ) -> AgentStrategyRecord: |
| 129 | return AgentStrategyRecord( |
| 130 | name=name, |
| 131 | max_fee_rate_sat_vbyte=max_fee, |
| 132 | min_confirmations=6, |
| 133 | utxo_consolidation_threshold=20, |
| 134 | dca_amount_sat=dca_amount_sat, |
| 135 | dca_interval_blocks=144, |
| 136 | lightning_rebalance_threshold=rebalance_threshold, |
| 137 | coin_selection="branch_and_bound", |
| 138 | simulation_mode=simulation_mode, |
| 139 | ) |
| 140 | |
| 141 | |
| 142 | def _price(usd: float = 62_000.0, ts: int = 1_700_000_000) -> OraclePriceTickRecord: |
| 143 | return OraclePriceTickRecord( |
| 144 | timestamp=ts, |
| 145 | block_height=850_000, |
| 146 | price_usd=usd, |
| 147 | source="coinbase", |
| 148 | ) |
| 149 | |
| 150 | |
| 151 | def _fee(t1: int = 30, t6: int = 15, t144: int = 3, ts: int = 1_700_000_000) -> FeeEstimateRecord: |
| 152 | return FeeEstimateRecord( |
| 153 | timestamp=ts, |
| 154 | block_height=850_000, |
| 155 | target_1_block_sat_vbyte=t1, |
| 156 | target_6_block_sat_vbyte=t6, |
| 157 | target_144_block_sat_vbyte=t144, |
| 158 | ) |
| 159 | |
| 160 | |
| 161 | def _routing() -> RoutingPolicyRecord: |
| 162 | return RoutingPolicyRecord( |
| 163 | channel_id="850000x1x0", |
| 164 | base_fee_msat=1_000, |
| 165 | fee_rate_ppm=500, |
| 166 | min_htlc_msat=1_000, |
| 167 | max_htlc_msat=1_000_000_000, |
| 168 | time_lock_delta=40, |
| 169 | ) |
| 170 | |
| 171 | |
| 172 | # --------------------------------------------------------------------------- |
| 173 | # _query.py: new economics helpers |
| 174 | # --------------------------------------------------------------------------- |
| 175 | |
| 176 | |
| 177 | class TestEstimatedInputVbytes: |
| 178 | def test_p2wpkh_is_41(self) -> None: |
| 179 | assert estimated_input_vbytes("p2wpkh") == 41 |
| 180 | |
| 181 | def test_p2pkh_is_148(self) -> None: |
| 182 | assert estimated_input_vbytes("p2pkh") == 148 |
| 183 | |
| 184 | def test_p2tr_is_58(self) -> None: |
| 185 | assert estimated_input_vbytes("p2tr") == 58 |
| 186 | |
| 187 | def test_unknown_uses_conservative_fallback(self) -> None: |
| 188 | assert estimated_input_vbytes("unknown") == 100 |
| 189 | |
| 190 | def test_p2sh_is_91(self) -> None: |
| 191 | assert estimated_input_vbytes("p2sh") == 91 |
| 192 | |
| 193 | |
| 194 | class TestEffectiveValueSat: |
| 195 | def test_positive_for_large_utxo(self) -> None: |
| 196 | u = _utxo(amount_sat=1_000_000, script_type="p2wpkh") |
| 197 | # effective = 1_000_000 − 41 × 10 = 999_590 |
| 198 | assert effective_value_sat(u, 10) == 1_000_000 - 41 * 10 |
| 199 | |
| 200 | def test_negative_for_tiny_utxo_at_high_fee(self) -> None: |
| 201 | u = _utxo(amount_sat=100, script_type="p2wpkh") |
| 202 | assert effective_value_sat(u, 10) < 0 |
| 203 | |
| 204 | def test_zero_fee_rate_returns_full_amount(self) -> None: |
| 205 | u = _utxo(amount_sat=50_000) |
| 206 | assert effective_value_sat(u, 0) == 50_000 |
| 207 | |
| 208 | def test_legacy_input_costs_more(self) -> None: |
| 209 | segwit = _utxo(amount_sat=100_000, script_type="p2wpkh") |
| 210 | legacy = _utxo(amount_sat=100_000, script_type="p2pkh") |
| 211 | assert effective_value_sat(segwit, 10) > effective_value_sat(legacy, 10) |
| 212 | |
| 213 | |
| 214 | class TestDustThreshold: |
| 215 | def test_p2wpkh_at_10_sat_vbyte(self) -> None: |
| 216 | # 3 × 41 × 10 = 1_230 |
| 217 | assert dust_threshold_sat("p2wpkh", 10) == 1_230 |
| 218 | |
| 219 | def test_p2pkh_is_larger(self) -> None: |
| 220 | assert dust_threshold_sat("p2pkh", 10) > dust_threshold_sat("p2wpkh", 10) |
| 221 | |
| 222 | def test_higher_fee_raises_threshold(self) -> None: |
| 223 | assert dust_threshold_sat("p2wpkh", 50) > dust_threshold_sat("p2wpkh", 10) |
| 224 | |
| 225 | |
| 226 | class TestIsDust: |
| 227 | def test_tiny_utxo_is_dust(self) -> None: |
| 228 | u = _utxo(amount_sat=200, script_type="p2wpkh") |
| 229 | assert is_dust(u, 10) |
| 230 | |
| 231 | def test_large_utxo_is_not_dust(self) -> None: |
| 232 | u = _utxo(amount_sat=1_000_000, script_type="p2wpkh") |
| 233 | assert not is_dust(u, 10) |
| 234 | |
| 235 | def test_zero_fee_rate_never_dust(self) -> None: |
| 236 | u = _utxo(amount_sat=1) |
| 237 | assert not is_dust(u, 0) |
| 238 | |
| 239 | |
| 240 | class TestBalanceByCategory: |
| 241 | def test_groups_by_label_category(self) -> None: |
| 242 | u1 = _utxo(amount_sat=500_000, address="addr1") |
| 243 | u2 = _utxo(amount_sat=300_000, address="addr2") |
| 244 | u3 = _utxo(amount_sat=200_000, address="addr3") |
| 245 | labels = [ |
| 246 | _label(address="addr1", category="income"), |
| 247 | _label(address="addr2", category="exchange"), |
| 248 | ] |
| 249 | result = balance_by_category([u1, u2, u3], labels) |
| 250 | assert result["income"] == 500_000 |
| 251 | assert result["exchange"] == 300_000 |
| 252 | assert result["unknown"] == 200_000 |
| 253 | |
| 254 | def test_empty_labels_all_unknown(self) -> None: |
| 255 | utxos = [_utxo(amount_sat=100_000)] |
| 256 | result = balance_by_category(utxos, []) |
| 257 | assert result == {"unknown": 100_000} |
| 258 | |
| 259 | def test_empty_utxos_returns_empty(self) -> None: |
| 260 | assert balance_by_category([], [_label()]) == {} |
| 261 | |
| 262 | |
| 263 | # --------------------------------------------------------------------------- |
| 264 | # Coin selection |
| 265 | # --------------------------------------------------------------------------- |
| 266 | |
| 267 | |
| 268 | class TestSelectCoins: |
| 269 | def test_largest_first_selects_minimum_inputs(self) -> None: |
| 270 | utxos = [ |
| 271 | _utxo(txid="a" * 64, vout=0, amount_sat=2_000_000), |
| 272 | _utxo(txid="b" * 64, vout=0, amount_sat=500_000), |
| 273 | _utxo(txid="c" * 64, vout=0, amount_sat=100_000), |
| 274 | ] |
| 275 | result = select_coins(utxos, 400_000, fee_rate_sat_vbyte=1, algorithm="largest_first") |
| 276 | assert result["success"] is True |
| 277 | # Should only need the 2M UTXO |
| 278 | assert len(result["selected"]) == 1 |
| 279 | assert result["selected"][0]["amount_sat"] == 2_000_000 |
| 280 | |
| 281 | def test_smallest_first_uses_small_utxos(self) -> None: |
| 282 | utxos = [ |
| 283 | _utxo(txid="a" * 64, vout=0, amount_sat=2_000_000), |
| 284 | _utxo(txid="b" * 64, vout=0, amount_sat=500_000), |
| 285 | _utxo(txid="c" * 64, vout=0, amount_sat=200_000), |
| 286 | ] |
| 287 | result = select_coins(utxos, 400_000, fee_rate_sat_vbyte=1, algorithm="smallest_first") |
| 288 | assert result["success"] is True |
| 289 | # Should start with the 200k then the 500k |
| 290 | selected_amounts = {u["amount_sat"] for u in result["selected"]} |
| 291 | assert 200_000 in selected_amounts |
| 292 | |
| 293 | def test_insufficient_funds_returns_failure(self) -> None: |
| 294 | u = _utxo(amount_sat=10_000) |
| 295 | result = select_coins([u], target_sat=1_000_000, fee_rate_sat_vbyte=1) |
| 296 | assert result["success"] is False |
| 297 | assert result["failure_reason"] is not None |
| 298 | assert "insufficient" in result["failure_reason"].lower() |
| 299 | |
| 300 | def test_all_dust_returns_failure(self) -> None: |
| 301 | u = _utxo(amount_sat=100, script_type="p2wpkh") |
| 302 | result = select_coins([u], target_sat=50, fee_rate_sat_vbyte=10) |
| 303 | assert result["success"] is False |
| 304 | assert "dust" in (result["failure_reason"] or "") |
| 305 | |
| 306 | def test_zero_target_returns_failure(self) -> None: |
| 307 | u = _utxo(amount_sat=1_000_000) |
| 308 | result = select_coins([u], target_sat=0, fee_rate_sat_vbyte=1) |
| 309 | assert result["success"] is False |
| 310 | |
| 311 | def test_bnb_finds_exact_match(self) -> None: |
| 312 | # Craft UTXOs so that one exactly equals the target |
| 313 | # effective_value at 1 sat/vbyte: 1_000_000 − 41 = 999_959 |
| 314 | # target_sat = 999_959 → exact match |
| 315 | u = _utxo(amount_sat=1_000_000, script_type="p2wpkh") |
| 316 | ev = effective_value_sat(u, 1) |
| 317 | result = select_coins([u], target_sat=ev, fee_rate_sat_vbyte=1, algorithm="branch_and_bound") |
| 318 | assert result["success"] is True |
| 319 | # Change should be zero or very small (exact BnB match) |
| 320 | assert result["change_sat"] == 0 or result["waste_score"] == 0 |
| 321 | |
| 322 | def test_bnb_falls_back_to_largest_when_no_exact_match(self) -> None: |
| 323 | utxos = [ |
| 324 | _utxo(txid="a" * 64, vout=0, amount_sat=300_000), |
| 325 | _utxo(txid="b" * 64, vout=0, amount_sat=400_000), |
| 326 | ] |
| 327 | result = select_coins(utxos, target_sat=500_000, fee_rate_sat_vbyte=1, algorithm="branch_and_bound") |
| 328 | assert result["success"] is True |
| 329 | |
| 330 | def test_random_returns_valid_selection(self) -> None: |
| 331 | utxos = [ |
| 332 | _utxo(txid=c * 64, vout=0, amount_sat=500_000) |
| 333 | for c in "abcde" |
| 334 | ] |
| 335 | result = select_coins(utxos, target_sat=300_000, fee_rate_sat_vbyte=1, algorithm="random") |
| 336 | assert result["success"] is True |
| 337 | assert result["total_input_sat"] >= result["target_sat"] |
| 338 | |
| 339 | def test_result_covers_target_plus_fee(self) -> None: |
| 340 | utxos = [_utxo(txid=c * 64, vout=i, amount_sat=200_000) for i, c in enumerate("abcde")] |
| 341 | result = select_coins(utxos, target_sat=350_000, fee_rate_sat_vbyte=10, algorithm="largest_first") |
| 342 | assert result["success"] is True |
| 343 | # total_input_sat must cover target + fee |
| 344 | assert result["total_input_sat"] >= result["target_sat"] + result["fee_sat"] |
| 345 | |
| 346 | def test_dust_utxos_excluded_from_selection(self) -> None: |
| 347 | dust = _utxo(txid="a" * 64, vout=0, amount_sat=100) |
| 348 | large = _utxo(txid="b" * 64, vout=0, amount_sat=1_000_000) |
| 349 | result = select_coins([dust, large], target_sat=50_000, fee_rate_sat_vbyte=10) |
| 350 | assert result["success"] is True |
| 351 | keys = {u["txid"] for u in result["selected"]} |
| 352 | assert "a" * 64 not in keys # dust excluded |
| 353 | |
| 354 | def test_fee_sat_is_positive(self) -> None: |
| 355 | u = _utxo(amount_sat=1_000_000) |
| 356 | result = select_coins([u], target_sat=500_000, fee_rate_sat_vbyte=10) |
| 357 | assert result["success"] is True |
| 358 | assert result["fee_sat"] > 0 |
| 359 | |
| 360 | def test_algorithm_recorded_in_result(self) -> None: |
| 361 | u = _utxo(amount_sat=1_000_000) |
| 362 | result = select_coins([u], target_sat=100_000, fee_rate_sat_vbyte=1, algorithm="largest_first") |
| 363 | assert result["algorithm"] == "largest_first" |
| 364 | |
| 365 | |
| 366 | # --------------------------------------------------------------------------- |
| 367 | # UTXO lifecycle |
| 368 | # --------------------------------------------------------------------------- |
| 369 | |
| 370 | |
| 371 | class TestUTXOLifecycle: |
| 372 | def test_confirmed_mature_utxo(self) -> None: |
| 373 | u = _utxo(amount_sat=500_000, confirmations=100, block_height=849_000, coinbase=False) |
| 374 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10, current_height=850_000) |
| 375 | assert lc["is_mature"] is True |
| 376 | assert lc["is_spendable"] is True |
| 377 | assert lc["is_coinbase"] is False |
| 378 | assert lc["age_blocks"] == 1_000 |
| 379 | |
| 380 | def test_immature_coinbase_not_spendable(self) -> None: |
| 381 | u = _utxo(amount_sat=625_000_000, confirmations=50, coinbase=True) |
| 382 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 383 | assert lc["is_coinbase"] is True |
| 384 | assert lc["is_mature"] is False |
| 385 | assert lc["is_spendable"] is False |
| 386 | |
| 387 | def test_mature_coinbase_is_spendable(self) -> None: |
| 388 | u = _utxo(amount_sat=625_000_000, confirmations=101, coinbase=True) |
| 389 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 390 | assert lc["is_mature"] is True |
| 391 | assert lc["is_spendable"] is True |
| 392 | |
| 393 | def test_dust_detection(self) -> None: |
| 394 | u = _utxo(amount_sat=200, script_type="p2wpkh") |
| 395 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 396 | assert lc["is_dust"] is True |
| 397 | assert lc["effective_value_sat"] < 0 |
| 398 | |
| 399 | def test_label_annotation(self) -> None: |
| 400 | u = _utxo(address="bc1qcold") |
| 401 | labels = [_label(address="bc1qcold", label="treasury", category="income")] |
| 402 | lc = utxo_lifecycle(u, labels=labels, fee_rate_sat_vbyte=10) |
| 403 | assert lc["label"] == "treasury" |
| 404 | assert lc["category"] == "income" |
| 405 | |
| 406 | def test_unlabelled_utxo_category_is_unknown(self) -> None: |
| 407 | u = _utxo(address="bc1qunknown") |
| 408 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 409 | assert lc["category"] == "unknown" |
| 410 | assert lc["label"] is None |
| 411 | |
| 412 | def test_unconfirmed_not_spendable(self) -> None: |
| 413 | u = _utxo(confirmations=0, block_height=None) |
| 414 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 415 | assert lc["is_spendable"] is False |
| 416 | |
| 417 | def test_no_current_height_age_is_none(self) -> None: |
| 418 | u = _utxo(block_height=840_000) |
| 419 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10, current_height=None) |
| 420 | assert lc["age_blocks"] is None |
| 421 | |
| 422 | def test_key_format(self) -> None: |
| 423 | u = _utxo(txid="b" * 64, vout=3) |
| 424 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 425 | assert lc["key"] == "b" * 64 + ":3" |
| 426 | |
| 427 | def test_estimated_spend_fee_is_positive(self) -> None: |
| 428 | u = _utxo(amount_sat=1_000_000) |
| 429 | lc = utxo_lifecycle(u, labels=[], fee_rate_sat_vbyte=10) |
| 430 | assert lc["estimated_spend_fee_sat"] > 0 |
| 431 | |
| 432 | |
| 433 | # --------------------------------------------------------------------------- |
| 434 | # Wallet summary |
| 435 | # --------------------------------------------------------------------------- |
| 436 | |
| 437 | |
| 438 | class TestWalletSummary: |
| 439 | def _make_default(self) -> WalletSummary: |
| 440 | utxos = [ |
| 441 | _utxo(txid="a" * 64, amount_sat=1_000_000, confirmations=6, address="bc1q1"), |
| 442 | _utxo(txid="b" * 64, amount_sat=500_000, confirmations=0, address="bc1q2"), # unconfirmed |
| 443 | _utxo(txid="c" * 64, amount_sat=625_000_000, confirmations=50, coinbase=True, address="bc1q3"), # immature |
| 444 | ] |
| 445 | labels = [_label(address="bc1q1", category="income")] |
| 446 | channels = [_channel()] |
| 447 | return wallet_summary( |
| 448 | utxos=utxos, |
| 449 | labels=labels, |
| 450 | channels=channels, |
| 451 | strategy=_strategy(), |
| 452 | prices=[_price(usd=60_000.0)], |
| 453 | mempool=[], |
| 454 | fee_rate_sat_vbyte=10, |
| 455 | current_height=850_000, |
| 456 | ) |
| 457 | |
| 458 | def test_total_sat_includes_all_utxos(self) -> None: |
| 459 | s = self._make_default() |
| 460 | assert s["total_sat"] == 1_000_000 + 500_000 + 625_000_000 |
| 461 | |
| 462 | def test_unconfirmed_separated(self) -> None: |
| 463 | s = self._make_default() |
| 464 | assert s["unconfirmed_sat"] == 500_000 |
| 465 | |
| 466 | def test_immature_coinbase_tracked(self) -> None: |
| 467 | s = self._make_default() |
| 468 | assert s["immature_coinbase_sat"] == 625_000_000 |
| 469 | |
| 470 | def test_spendable_excludes_immature_and_unconfirmed(self) -> None: |
| 471 | s = self._make_default() |
| 472 | assert s["spendable_sat"] == 1_000_000 # only the confirmed, non-coinbase UTXO |
| 473 | |
| 474 | def test_lightning_balance_included(self) -> None: |
| 475 | s = self._make_default() |
| 476 | assert s["in_lightning_sat"] == 1_000_000 |
| 477 | |
| 478 | def test_usd_value_computed(self) -> None: |
| 479 | s = self._make_default() |
| 480 | assert s["total_usd"] is not None |
| 481 | assert s["total_usd"] > 0 |
| 482 | |
| 483 | def test_no_price_data_gives_none_usd(self) -> None: |
| 484 | s = wallet_summary( |
| 485 | utxos=[_utxo(amount_sat=1_000_000)], |
| 486 | labels=[], |
| 487 | channels=[], |
| 488 | strategy=_strategy(), |
| 489 | prices=[], |
| 490 | mempool=[], |
| 491 | ) |
| 492 | assert s["total_usd"] is None |
| 493 | |
| 494 | def test_dust_count(self) -> None: |
| 495 | dust = _utxo(txid="d" * 64, amount_sat=200, script_type="p2wpkh") |
| 496 | large = _utxo(txid="e" * 64, amount_sat=1_000_000) |
| 497 | s = wallet_summary( |
| 498 | utxos=[dust, large], |
| 499 | labels=[], |
| 500 | channels=[], |
| 501 | strategy=_strategy(), |
| 502 | prices=[], |
| 503 | mempool=[], |
| 504 | fee_rate_sat_vbyte=10, |
| 505 | ) |
| 506 | assert s["dust_utxo_count"] == 1 |
| 507 | |
| 508 | def test_channel_counts(self) -> None: |
| 509 | s = self._make_default() |
| 510 | assert s["channel_count"] == 1 |
| 511 | assert s["active_channel_count"] == 1 |
| 512 | |
| 513 | def test_simulation_mode_propagated(self) -> None: |
| 514 | s = wallet_summary( |
| 515 | utxos=[], |
| 516 | labels=[], |
| 517 | channels=[], |
| 518 | strategy=_strategy(simulation_mode=True), |
| 519 | prices=[], |
| 520 | mempool=[], |
| 521 | ) |
| 522 | assert s["simulation_mode"] is True |
| 523 | |
| 524 | def test_script_type_breakdown_present(self) -> None: |
| 525 | s = self._make_default() |
| 526 | assert "p2wpkh" in s["script_type_breakdown"] |
| 527 | |
| 528 | def test_category_breakdown_present(self) -> None: |
| 529 | s = self._make_default() |
| 530 | assert "income" in s["category_breakdown"] |
| 531 | |
| 532 | |
| 533 | # --------------------------------------------------------------------------- |
| 534 | # Portfolio analytics |
| 535 | # --------------------------------------------------------------------------- |
| 536 | |
| 537 | |
| 538 | class TestPortfolioSnapshot: |
| 539 | def test_totals_correct(self) -> None: |
| 540 | utxos = [_utxo(amount_sat=1_000_000)] |
| 541 | channels = [_channel(local_balance_sat=500_000)] |
| 542 | snap = portfolio_snapshot(utxos, channels, [_price(usd=60_000.0)]) |
| 543 | assert snap["on_chain_sat"] == 1_000_000 |
| 544 | assert snap["lightning_sat"] == 500_000 |
| 545 | assert snap["total_sat"] == 1_500_000 |
| 546 | |
| 547 | def test_usd_computed(self) -> None: |
| 548 | utxos = [_utxo(amount_sat=_SATS_PER_BTC)] |
| 549 | snap = portfolio_snapshot(utxos, [], [_price(usd=60_000.0)]) |
| 550 | assert snap["total_usd"] is not None |
| 551 | assert abs((snap["total_usd"] or 0) - 60_000.0) < 1.0 |
| 552 | |
| 553 | def test_no_price_gives_none_usd(self) -> None: |
| 554 | snap = portfolio_snapshot([_utxo()], [], []) |
| 555 | assert snap["total_usd"] is None |
| 556 | |
| 557 | def test_height_and_timestamp_stored(self) -> None: |
| 558 | snap = portfolio_snapshot([], [], [], current_height=850_000, timestamp=1_700_000_000) |
| 559 | assert snap["block_height"] == 850_000 |
| 560 | assert snap["timestamp"] == 1_700_000_000 |
| 561 | |
| 562 | |
| 563 | _SATS_PER_BTC = 100_000_000 |
| 564 | |
| 565 | |
| 566 | class TestPortfolioPnL: |
| 567 | def _snap(self, total: int, price: float | None = None) -> PortfolioSnapshot: |
| 568 | usd = ((total / _SATS_PER_BTC) * price) if price else None |
| 569 | return PortfolioSnapshot( |
| 570 | block_height=850_000, |
| 571 | timestamp=1_700_000_000, |
| 572 | on_chain_sat=total, |
| 573 | lightning_sat=0, |
| 574 | total_sat=total, |
| 575 | price_usd=price, |
| 576 | total_usd=usd, |
| 577 | ) |
| 578 | |
| 579 | def test_positive_pnl(self) -> None: |
| 580 | base = self._snap(1_000_000) |
| 581 | current = self._snap(1_500_000) |
| 582 | pnl = portfolio_pnl(base, current) |
| 583 | assert pnl["sat_delta"] == 500_000 |
| 584 | assert pnl["pct_change"] is not None |
| 585 | assert pnl["pct_change"] > 0 |
| 586 | |
| 587 | def test_negative_pnl(self) -> None: |
| 588 | base = self._snap(2_000_000) |
| 589 | current = self._snap(1_500_000) |
| 590 | pnl = portfolio_pnl(base, current) |
| 591 | assert pnl["sat_delta"] == -500_000 |
| 592 | |
| 593 | def test_fees_paid_affects_net_delta(self) -> None: |
| 594 | base = self._snap(1_000_000) |
| 595 | current = self._snap(900_000) |
| 596 | pnl = portfolio_pnl(base, current, estimated_fees_paid_sat=50_000) |
| 597 | # net = -100_000 + 50_000 = -50_000 (fees were a cost, adding them back) |
| 598 | assert pnl["net_sat_delta"] == -100_000 + 50_000 |
| 599 | |
| 600 | def test_usd_delta_when_prices_available(self) -> None: |
| 601 | base = self._snap(1_000_000, 60_000.0) |
| 602 | current = self._snap(1_000_000, 70_000.0) |
| 603 | pnl = portfolio_pnl(base, current) |
| 604 | assert pnl["usd_delta"] is not None |
| 605 | assert (pnl["usd_delta"] or 0) > 0 # USD went up even with same sats |
| 606 | |
| 607 | def test_usd_delta_none_without_prices(self) -> None: |
| 608 | base = self._snap(1_000_000, None) |
| 609 | current = self._snap(1_500_000, None) |
| 610 | pnl = portfolio_pnl(base, current) |
| 611 | assert pnl["usd_delta"] is None |
| 612 | |
| 613 | def test_zero_change(self) -> None: |
| 614 | snap = self._snap(1_000_000) |
| 615 | pnl = portfolio_pnl(snap, snap) |
| 616 | assert pnl["sat_delta"] == 0 |
| 617 | assert pnl["pct_change"] == 0.0 |
| 618 | |
| 619 | |
| 620 | # --------------------------------------------------------------------------- |
| 621 | # Lightning analytics |
| 622 | # --------------------------------------------------------------------------- |
| 623 | |
| 624 | |
| 625 | class TestRebalanceCandidates: |
| 626 | def test_balanced_channel_not_a_candidate(self) -> None: |
| 627 | ch = _channel(local_balance_sat=960_000, remote_balance_sat=960_000, capacity_sat=2_000_000) |
| 628 | result = rebalance_candidates([ch], _strategy(rebalance_threshold=0.2)) |
| 629 | assert result == [] |
| 630 | |
| 631 | def test_low_local_balance_pull_in(self) -> None: |
| 632 | # local/(capacity-reserves) < 0.2 → pull_in |
| 633 | ch = _channel( |
| 634 | local_balance_sat=10_000, |
| 635 | remote_balance_sat=1_950_000, |
| 636 | capacity_sat=2_000_000, |
| 637 | local_reserve_sat=20_000, |
| 638 | remote_reserve_sat=20_000, |
| 639 | ) |
| 640 | result = rebalance_candidates([ch], _strategy(rebalance_threshold=0.2)) |
| 641 | assert len(result) == 1 |
| 642 | assert result[0]["direction"] == "pull_in" |
| 643 | |
| 644 | def test_high_local_balance_push_out(self) -> None: |
| 645 | # local/(capacity-reserves) > 0.8 → push_out |
| 646 | ch = _channel( |
| 647 | local_balance_sat=1_900_000, |
| 648 | remote_balance_sat=50_000, |
| 649 | capacity_sat=2_000_000, |
| 650 | local_reserve_sat=20_000, |
| 651 | remote_reserve_sat=20_000, |
| 652 | ) |
| 653 | result = rebalance_candidates([ch], _strategy(rebalance_threshold=0.2)) |
| 654 | assert len(result) == 1 |
| 655 | assert result[0]["direction"] == "push_out" |
| 656 | |
| 657 | def test_inactive_channel_excluded(self) -> None: |
| 658 | ch = _channel(local_balance_sat=0, is_active=False) |
| 659 | result = rebalance_candidates([ch], _strategy()) |
| 660 | assert result == [] |
| 661 | |
| 662 | def test_critical_urgency_for_extreme_imbalance(self) -> None: |
| 663 | ch = _channel( |
| 664 | local_balance_sat=1_000, |
| 665 | remote_balance_sat=1_960_000, |
| 666 | capacity_sat=2_000_000, |
| 667 | local_reserve_sat=20_000, |
| 668 | remote_reserve_sat=20_000, |
| 669 | ) |
| 670 | result = rebalance_candidates([ch], _strategy(rebalance_threshold=0.2)) |
| 671 | assert result[0]["urgency"] == "critical" |
| 672 | |
| 673 | def test_multiple_channels_sorted_by_urgency(self) -> None: |
| 674 | # Slightly imbalanced channel |
| 675 | ch_medium = _channel( |
| 676 | channel_id="1", |
| 677 | local_balance_sat=200_000, |
| 678 | remote_balance_sat=1_750_000, |
| 679 | capacity_sat=2_000_000, |
| 680 | local_reserve_sat=20_000, |
| 681 | remote_reserve_sat=20_000, |
| 682 | ) |
| 683 | # Severely imbalanced channel |
| 684 | ch_critical = _channel( |
| 685 | channel_id="2", |
| 686 | local_balance_sat=1_000, |
| 687 | remote_balance_sat=1_960_000, |
| 688 | capacity_sat=2_000_000, |
| 689 | local_reserve_sat=20_000, |
| 690 | remote_reserve_sat=20_000, |
| 691 | ) |
| 692 | result = rebalance_candidates([ch_medium, ch_critical], _strategy(rebalance_threshold=0.2)) |
| 693 | assert result[0]["channel_id"] == "2" # critical first |
| 694 | |
| 695 | def test_suggested_amount_is_positive(self) -> None: |
| 696 | ch = _channel( |
| 697 | local_balance_sat=10_000, |
| 698 | remote_balance_sat=1_950_000, |
| 699 | capacity_sat=2_000_000, |
| 700 | local_reserve_sat=20_000, |
| 701 | remote_reserve_sat=20_000, |
| 702 | ) |
| 703 | result = rebalance_candidates([ch], _strategy(rebalance_threshold=0.2)) |
| 704 | assert result[0]["suggested_amount_sat"] > 0 |
| 705 | |
| 706 | |
| 707 | class TestChannelHealthReport: |
| 708 | def test_empty_channels_perfect_score(self) -> None: |
| 709 | report = channel_health_report([], [], _strategy()) |
| 710 | assert report["health_score"] == 1.0 |
| 711 | assert report["total_channels"] == 0 |
| 712 | |
| 713 | def test_all_balanced_active_channels(self) -> None: |
| 714 | channels = [ |
| 715 | _channel(channel_id="1", local_balance_sat=960_000, remote_balance_sat=960_000), |
| 716 | _channel(channel_id="2", local_balance_sat=960_000, remote_balance_sat=960_000), |
| 717 | ] |
| 718 | report = channel_health_report(channels, [_routing()], _strategy()) |
| 719 | assert report["health_score"] >= 0.9 |
| 720 | assert "Excellent" in report["assessment"] |
| 721 | |
| 722 | def test_inactive_channel_lowers_score(self) -> None: |
| 723 | active = _channel(channel_id="1") |
| 724 | inactive = _channel(channel_id="2", is_active=False) |
| 725 | report = channel_health_report([active, inactive], [], _strategy()) |
| 726 | assert report["inactive_channels"] == 1 |
| 727 | assert report["health_score"] < 1.0 |
| 728 | |
| 729 | def test_imbalanced_channels_lower_score(self) -> None: |
| 730 | imbalanced = _channel( |
| 731 | local_balance_sat=10_000, |
| 732 | remote_balance_sat=1_950_000, |
| 733 | capacity_sat=2_000_000, |
| 734 | local_reserve_sat=20_000, |
| 735 | remote_reserve_sat=20_000, |
| 736 | ) |
| 737 | report = channel_health_report([imbalanced], [], _strategy()) |
| 738 | assert report["imbalanced_count"] >= 1 |
| 739 | assert report["health_score"] < 1.0 |
| 740 | |
| 741 | def test_rebalance_candidates_in_report(self) -> None: |
| 742 | ch = _channel( |
| 743 | local_balance_sat=1_000, |
| 744 | remote_balance_sat=1_960_000, |
| 745 | capacity_sat=2_000_000, |
| 746 | local_reserve_sat=20_000, |
| 747 | remote_reserve_sat=20_000, |
| 748 | ) |
| 749 | report = channel_health_report([ch], [], _strategy()) |
| 750 | assert len(report["rebalance_candidates"]) >= 1 |
| 751 | |
| 752 | def test_capacity_totals(self) -> None: |
| 753 | ch1 = _channel(channel_id="1", capacity_sat=1_000_000) |
| 754 | ch2 = _channel(channel_id="2", capacity_sat=2_000_000) |
| 755 | report = channel_health_report([ch1, ch2], [], _strategy()) |
| 756 | assert report["total_capacity_sat"] == 3_000_000 |
| 757 | |
| 758 | def test_htlc_count_tracked(self) -> None: |
| 759 | ch = _channel(htlc_count=3) |
| 760 | report = channel_health_report([ch], [], _strategy()) |
| 761 | assert report["htlc_stuck_count"] == 3 |
| 762 | |
| 763 | |
| 764 | # --------------------------------------------------------------------------- |
| 765 | # Fee window analytics |
| 766 | # --------------------------------------------------------------------------- |
| 767 | |
| 768 | |
| 769 | class TestFeeWindow: |
| 770 | def test_empty_history_defaults_to_send_now(self) -> None: |
| 771 | result = fee_window([], target_blocks=6) |
| 772 | assert result["recommendation"] == "send_now" |
| 773 | assert result["current_sat_vbyte"] == 1 |
| 774 | |
| 775 | def test_low_fee_sends_now(self) -> None: |
| 776 | # Create history with mostly high fees, current is low |
| 777 | history = [_fee(t6=100, ts=1_000_000 + i) for i in range(20)] |
| 778 | history.append(_fee(t6=5, ts=2_000_000)) # current: very low |
| 779 | result = fee_window(history, target_blocks=6) |
| 780 | assert result["recommendation"] == "send_now" |
| 781 | assert result["percentile"] <= 0.25 |
| 782 | |
| 783 | def test_high_fee_recommends_wait(self) -> None: |
| 784 | # Create history with mostly low fees, current is high |
| 785 | history = [_fee(t6=5, ts=1_000_000 + i) for i in range(20)] |
| 786 | history.append(_fee(t6=100, ts=2_000_000)) # current: very high |
| 787 | result = fee_window(history, target_blocks=6) |
| 788 | assert result["recommendation"] == "wait" |
| 789 | assert result["percentile"] >= 0.75 |
| 790 | |
| 791 | def test_stuck_tx_recommends_rbf(self) -> None: |
| 792 | result = fee_window( |
| 793 | [_fee()], |
| 794 | target_blocks=1, |
| 795 | pending_txids=["deadbeef"], |
| 796 | ) |
| 797 | assert result["recommendation"] == "rbf_now" |
| 798 | |
| 799 | def test_historical_stats_present(self) -> None: |
| 800 | history = [ |
| 801 | _fee(t6=10, ts=1_000), |
| 802 | _fee(t6=20, ts=2_000), |
| 803 | _fee(t6=30, ts=3_000), |
| 804 | ] |
| 805 | result = fee_window(history, target_blocks=6) |
| 806 | assert result["historical_min_sat_vbyte"] == 10 |
| 807 | assert result["historical_max_sat_vbyte"] == 30 |
| 808 | assert result["historical_median_sat_vbyte"] == 20 |
| 809 | |
| 810 | def test_target_blocks_1_uses_1_block_rate(self) -> None: |
| 811 | result = fee_window([_fee(t1=50, t6=20)], target_blocks=1) |
| 812 | assert result["current_sat_vbyte"] == 50 |
| 813 | |
| 814 | def test_target_blocks_144_uses_144_block_rate(self) -> None: |
| 815 | result = fee_window([_fee(t6=20, t144=3)], target_blocks=144) |
| 816 | assert result["current_sat_vbyte"] == 3 |
| 817 | |
| 818 | def test_optimal_wait_blocks_set_when_waiting(self) -> None: |
| 819 | history = [_fee(t6=5, ts=1_000_000 + i) for i in range(20)] |
| 820 | history.append(_fee(t6=100, ts=2_000_000)) |
| 821 | result = fee_window(history, target_blocks=6) |
| 822 | if result["recommendation"] == "wait": |
| 823 | assert result["optimal_wait_blocks"] is not None |
| 824 | assert result["optimal_wait_blocks"] > 0 |
| 825 | |
| 826 | def test_send_now_has_none_wait_blocks(self) -> None: |
| 827 | history = [_fee(t6=100, ts=1_000_000 + i) for i in range(20)] |
| 828 | history.append(_fee(t6=5, ts=2_000_000)) # current is low |
| 829 | result = fee_window(history, target_blocks=6) |
| 830 | if result["recommendation"] == "send_now": |
| 831 | assert result["optimal_wait_blocks"] is None |
| 832 | |
| 833 | |
| 834 | # --------------------------------------------------------------------------- |
| 835 | # Consolidation planner |
| 836 | # --------------------------------------------------------------------------- |
| 837 | |
| 838 | |
| 839 | class TestConsolidationPlan: |
| 840 | def test_single_utxo_not_recommended(self) -> None: |
| 841 | plan = consolidation_plan([_utxo(amount_sat=1_000_000)], fee_rate_sat_vbyte=10) |
| 842 | assert plan["recommended"] is False |
| 843 | assert plan["input_count"] == 0 |
| 844 | |
| 845 | def test_many_small_utxos_recommended_at_low_fee(self) -> None: |
| 846 | # Many small UTXOs at low fee rate |
| 847 | utxos = [ |
| 848 | _utxo(txid=c * 64, vout=i, amount_sat=50_000, confirmations=6) |
| 849 | for i, c in enumerate("abcdefghij") |
| 850 | ] |
| 851 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=1, savings_horizon_spends=20) |
| 852 | assert plan["recommended"] is True |
| 853 | assert plan["input_count"] > 0 |
| 854 | assert plan["estimated_fee_sat"] > 0 |
| 855 | |
| 856 | def test_dust_utxos_excluded(self) -> None: |
| 857 | dust = _utxo(txid="a" * 64, vout=0, amount_sat=200, confirmations=6) |
| 858 | large = _utxo(txid="b" * 64, vout=0, amount_sat=1_000_000, confirmations=6) |
| 859 | plan = consolidation_plan([dust, large], fee_rate_sat_vbyte=10) |
| 860 | selected_keys = {u["txid"] for u in plan["utxos_to_consolidate"]} |
| 861 | assert "a" * 64 not in selected_keys |
| 862 | |
| 863 | def test_immature_coinbase_excluded(self) -> None: |
| 864 | immature = _utxo(txid="a" * 64, vout=0, amount_sat=625_000_000, coinbase=True, confirmations=50) |
| 865 | mature = _utxo(txid="b" * 64, vout=0, amount_sat=50_000, confirmations=6) |
| 866 | plan = consolidation_plan([immature, mature], fee_rate_sat_vbyte=1) |
| 867 | selected_keys = {u["txid"] for u in plan["utxos_to_consolidate"]} |
| 868 | assert "a" * 64 not in selected_keys |
| 869 | |
| 870 | def test_output_count_is_one(self) -> None: |
| 871 | utxos = [_utxo(txid=c * 64, vout=i, amount_sat=50_000, confirmations=6) for i, c in enumerate("abcde")] |
| 872 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=1) |
| 873 | if plan["input_count"] > 0: |
| 874 | assert plan["output_count"] == 1 |
| 875 | |
| 876 | def test_break_even_fee_rate_positive_when_savings_possible(self) -> None: |
| 877 | utxos = [_utxo(txid=c * 64, vout=i, amount_sat=50_000, confirmations=6) for i, c in enumerate("abcde")] |
| 878 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=1, savings_horizon_spends=10) |
| 879 | if plan["input_count"] > 0: |
| 880 | assert plan["break_even_fee_rate"] >= 0 |
| 881 | |
| 882 | def test_reason_string_present(self) -> None: |
| 883 | utxos = [_utxo(txid=c * 64, vout=i, amount_sat=50_000, confirmations=6) for i, c in enumerate("abcde")] |
| 884 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=1) |
| 885 | assert len(plan["reason"]) > 0 |
| 886 | |
| 887 | def test_max_inputs_respected(self) -> None: |
| 888 | utxos = [ |
| 889 | _utxo(txid=f"{'a' * 63}{i}", vout=0, amount_sat=10_000, confirmations=6) |
| 890 | for i in range(30) |
| 891 | ] |
| 892 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=1, max_inputs=10) |
| 893 | assert plan["input_count"] <= 10 |
| 894 | |
| 895 | def test_high_fee_rate_may_not_recommend(self) -> None: |
| 896 | # At very high fees, consolidation might not be worth it |
| 897 | utxos = [ |
| 898 | _utxo(txid=c * 64, vout=i, amount_sat=50_000, confirmations=6) |
| 899 | for i, c in enumerate("ab") # only 2 UTXOs |
| 900 | ] |
| 901 | plan = consolidation_plan(utxos, fee_rate_sat_vbyte=200, savings_horizon_spends=2) |
| 902 | # With very high fee and few future spends, it may not be recommended |
| 903 | # Just verify it produces a valid result without errors |
| 904 | assert isinstance(plan["recommended"], bool) |