test_bitcoin_plugin.py
python
| 1 | """Comprehensive test suite for the Bitcoin domain plugin. |
| 2 | |
| 3 | Tests cover all three protocol levels: |
| 4 | - Core MuseDomainPlugin (snapshot, diff, merge, drift, apply, schema) |
| 5 | - StructuredMergePlugin (OT merge with double-spend detection) |
| 6 | - CRDTPlugin (convergent join — commutativity, associativity, idempotency) |
| 7 | |
| 8 | Tests are organized by capability, each asserting exact behaviour |
| 9 | with realistic Bitcoin fixtures. |
| 10 | """ |
| 11 | |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import hashlib |
| 15 | import json |
| 16 | import pathlib |
| 17 | import tempfile |
| 18 | |
| 19 | import pytest |
| 20 | |
| 21 | from muse._version import __version__ |
| 22 | from muse.domain import ( |
| 23 | CRDTPlugin, |
| 24 | DeleteOp, |
| 25 | DomainOp, |
| 26 | InsertOp, |
| 27 | MuseDomainPlugin, |
| 28 | MutateOp, |
| 29 | PatchOp, |
| 30 | ReplaceOp, |
| 31 | SnapshotManifest, |
| 32 | StructuredMergePlugin, |
| 33 | ) |
| 34 | from muse.plugins.bitcoin._query import ( |
| 35 | balance_by_script_type, |
| 36 | channel_liquidity_totals, |
| 37 | channel_utilization, |
| 38 | coin_age_blocks, |
| 39 | confirmed_balance_sat, |
| 40 | double_spend_candidates, |
| 41 | fee_surface_str, |
| 42 | format_sat, |
| 43 | latest_fee_estimate, |
| 44 | latest_price, |
| 45 | mempool_summary_line, |
| 46 | strategy_summary_line, |
| 47 | total_balance_sat, |
| 48 | utxo_key, |
| 49 | utxo_summary_line, |
| 50 | ) |
| 51 | from muse.plugins.bitcoin._types import ( |
| 52 | AddressLabelRecord, |
| 53 | AgentStrategyRecord, |
| 54 | CoinCategory, |
| 55 | CoinSelectAlgo, |
| 56 | FeeEstimateRecord, |
| 57 | LightningChannelRecord, |
| 58 | OraclePriceTickRecord, |
| 59 | PendingTxRecord, |
| 60 | RoutingPolicyRecord, |
| 61 | ScriptType, |
| 62 | UTXORecord, |
| 63 | ) |
| 64 | from muse.plugins.bitcoin.plugin import ( |
| 65 | BitcoinPlugin, |
| 66 | _diff_channels, |
| 67 | _diff_labels, |
| 68 | _diff_strategy, |
| 69 | _diff_time_series, |
| 70 | _diff_transactions, |
| 71 | _diff_utxos, |
| 72 | _handler_for_path, |
| 73 | ) |
| 74 | |
| 75 | # --------------------------------------------------------------------------- |
| 76 | # Fixtures |
| 77 | # --------------------------------------------------------------------------- |
| 78 | |
| 79 | |
| 80 | def _make_utxo( |
| 81 | txid: str = "abc" * 21 + "ab", |
| 82 | vout: int = 0, |
| 83 | amount_sat: int = 100_000, |
| 84 | script_type: ScriptType = "p2wpkh", |
| 85 | address: str = "bc1qtest", |
| 86 | confirmations: int = 6, |
| 87 | block_height: int | None = 850_000, |
| 88 | coinbase: bool = False, |
| 89 | label: str | None = None, |
| 90 | ) -> UTXORecord: |
| 91 | return UTXORecord( |
| 92 | txid=txid, |
| 93 | vout=vout, |
| 94 | amount_sat=amount_sat, |
| 95 | script_type=script_type, |
| 96 | address=address, |
| 97 | confirmations=confirmations, |
| 98 | block_height=block_height, |
| 99 | coinbase=coinbase, |
| 100 | label=label, |
| 101 | ) |
| 102 | |
| 103 | |
| 104 | def _make_channel( |
| 105 | channel_id: str = "850000x1x0", |
| 106 | peer_pubkey: str = "0279" + "aa" * 32, |
| 107 | peer_alias: str | None = "ACINQ", |
| 108 | capacity_sat: int = 2_000_000, |
| 109 | local_balance_sat: int = 1_000_000, |
| 110 | remote_balance_sat: int = 900_000, |
| 111 | is_active: bool = True, |
| 112 | is_public: bool = True, |
| 113 | local_reserve_sat: int = 20_000, |
| 114 | remote_reserve_sat: int = 20_000, |
| 115 | unsettled_balance_sat: int = 0, |
| 116 | htlc_count: int = 0, |
| 117 | ) -> LightningChannelRecord: |
| 118 | return LightningChannelRecord( |
| 119 | channel_id=channel_id, |
| 120 | peer_pubkey=peer_pubkey, |
| 121 | peer_alias=peer_alias, |
| 122 | capacity_sat=capacity_sat, |
| 123 | local_balance_sat=local_balance_sat, |
| 124 | remote_balance_sat=remote_balance_sat, |
| 125 | is_active=is_active, |
| 126 | is_public=is_public, |
| 127 | local_reserve_sat=local_reserve_sat, |
| 128 | remote_reserve_sat=remote_reserve_sat, |
| 129 | unsettled_balance_sat=unsettled_balance_sat, |
| 130 | htlc_count=htlc_count, |
| 131 | ) |
| 132 | |
| 133 | |
| 134 | def _make_label( |
| 135 | address: str = "bc1qtest", |
| 136 | label: str = "cold storage", |
| 137 | category: CoinCategory = "income", |
| 138 | created_at: int = 1_700_000_000, |
| 139 | ) -> AddressLabelRecord: |
| 140 | return AddressLabelRecord( |
| 141 | address=address, |
| 142 | label=label, |
| 143 | category=category, |
| 144 | created_at=created_at, |
| 145 | ) |
| 146 | |
| 147 | |
| 148 | def _make_strategy( |
| 149 | name: str = "conservative", |
| 150 | max_fee_rate_sat_vbyte: int = 10, |
| 151 | min_confirmations: int = 6, |
| 152 | utxo_consolidation_threshold: int = 20, |
| 153 | dca_amount_sat: int | None = 500_000, |
| 154 | dca_interval_blocks: int | None = 144, |
| 155 | lightning_rebalance_threshold: float = 0.2, |
| 156 | coin_selection: CoinSelectAlgo = "branch_and_bound", |
| 157 | simulation_mode: bool = False, |
| 158 | ) -> AgentStrategyRecord: |
| 159 | return AgentStrategyRecord( |
| 160 | name=name, |
| 161 | max_fee_rate_sat_vbyte=max_fee_rate_sat_vbyte, |
| 162 | min_confirmations=min_confirmations, |
| 163 | utxo_consolidation_threshold=utxo_consolidation_threshold, |
| 164 | dca_amount_sat=dca_amount_sat, |
| 165 | dca_interval_blocks=dca_interval_blocks, |
| 166 | lightning_rebalance_threshold=lightning_rebalance_threshold, |
| 167 | coin_selection=coin_selection, |
| 168 | simulation_mode=simulation_mode, |
| 169 | ) |
| 170 | |
| 171 | |
| 172 | def _make_price( |
| 173 | timestamp: int = 1_700_000_000, |
| 174 | block_height: int | None = 850_000, |
| 175 | price_usd: float = 62_000.0, |
| 176 | source: str = "coinbase", |
| 177 | ) -> OraclePriceTickRecord: |
| 178 | return OraclePriceTickRecord( |
| 179 | timestamp=timestamp, |
| 180 | block_height=block_height, |
| 181 | price_usd=price_usd, |
| 182 | source=source, |
| 183 | ) |
| 184 | |
| 185 | |
| 186 | def _make_fee( |
| 187 | timestamp: int = 1_700_000_000, |
| 188 | block_height: int | None = 850_000, |
| 189 | t1: int = 30, |
| 190 | t6: int = 15, |
| 191 | t144: int = 3, |
| 192 | ) -> FeeEstimateRecord: |
| 193 | return FeeEstimateRecord( |
| 194 | timestamp=timestamp, |
| 195 | block_height=block_height, |
| 196 | target_1_block_sat_vbyte=t1, |
| 197 | target_6_block_sat_vbyte=t6, |
| 198 | target_144_block_sat_vbyte=t144, |
| 199 | ) |
| 200 | |
| 201 | |
| 202 | _AnyBitcoinRecord = ( |
| 203 | UTXORecord |
| 204 | | LightningChannelRecord |
| 205 | | AddressLabelRecord |
| 206 | | AgentStrategyRecord |
| 207 | | FeeEstimateRecord |
| 208 | | RoutingPolicyRecord |
| 209 | | PendingTxRecord |
| 210 | | OraclePriceTickRecord |
| 211 | ) |
| 212 | |
| 213 | |
| 214 | def _json_bytes(obj: _AnyBitcoinRecord | list[_AnyBitcoinRecord]) -> bytes: |
| 215 | return json.dumps(obj, sort_keys=True).encode() |
| 216 | |
| 217 | |
| 218 | def _sha256(data: bytes) -> str: |
| 219 | return hashlib.sha256(data).hexdigest() |
| 220 | |
| 221 | |
| 222 | def _make_manifest(files: dict[str, bytes]) -> SnapshotManifest: |
| 223 | return SnapshotManifest( |
| 224 | files={path: _sha256(data) for path, data in files.items()}, |
| 225 | domain="bitcoin", |
| 226 | ) |
| 227 | |
| 228 | |
| 229 | # --------------------------------------------------------------------------- |
| 230 | # Protocol conformance |
| 231 | # --------------------------------------------------------------------------- |
| 232 | |
| 233 | |
| 234 | class TestProtocolConformance: |
| 235 | def test_satisfies_muse_domain_plugin(self) -> None: |
| 236 | assert isinstance(BitcoinPlugin(), MuseDomainPlugin) |
| 237 | |
| 238 | def test_satisfies_structured_merge_plugin(self) -> None: |
| 239 | assert isinstance(BitcoinPlugin(), StructuredMergePlugin) |
| 240 | |
| 241 | def test_satisfies_crdt_plugin(self) -> None: |
| 242 | assert isinstance(BitcoinPlugin(), CRDTPlugin) |
| 243 | |
| 244 | |
| 245 | # --------------------------------------------------------------------------- |
| 246 | # snapshot |
| 247 | # --------------------------------------------------------------------------- |
| 248 | |
| 249 | |
| 250 | class TestSnapshot: |
| 251 | def test_snapshot_from_path_hashes_all_json_files(self) -> None: |
| 252 | plugin = BitcoinPlugin() |
| 253 | with tempfile.TemporaryDirectory() as tmp: |
| 254 | root = pathlib.Path(tmp) |
| 255 | (root / "wallet").mkdir() |
| 256 | (root / "wallet" / "utxos.json").write_bytes(b"[]") |
| 257 | (root / "wallet" / "labels.json").write_bytes(b"[]") |
| 258 | |
| 259 | snap = plugin.snapshot(root) |
| 260 | |
| 261 | assert snap["domain"] == "bitcoin" |
| 262 | assert "wallet/utxos.json" in snap["files"] |
| 263 | assert "wallet/labels.json" in snap["files"] |
| 264 | |
| 265 | def test_snapshot_excludes_hidden_files(self) -> None: |
| 266 | plugin = BitcoinPlugin() |
| 267 | with tempfile.TemporaryDirectory() as tmp: |
| 268 | root = pathlib.Path(tmp) |
| 269 | (root / "wallet").mkdir() |
| 270 | (root / "wallet" / "utxos.json").write_bytes(b"[]") |
| 271 | (root / ".secret").write_bytes(b"private key would be here") |
| 272 | |
| 273 | snap = plugin.snapshot(root) |
| 274 | |
| 275 | assert ".secret" not in snap["files"] |
| 276 | assert "wallet/utxos.json" in snap["files"] |
| 277 | |
| 278 | def test_snapshot_passthrough_for_existing_manifest(self) -> None: |
| 279 | plugin = BitcoinPlugin() |
| 280 | manifest = SnapshotManifest( |
| 281 | files={"wallet/utxos.json": "abc" * 21 + "ab"}, |
| 282 | domain="bitcoin", |
| 283 | ) |
| 284 | result = plugin.snapshot(manifest) |
| 285 | assert result is manifest |
| 286 | |
| 287 | def test_snapshot_content_hash_is_sha256(self) -> None: |
| 288 | plugin = BitcoinPlugin() |
| 289 | with tempfile.TemporaryDirectory() as tmp: |
| 290 | root = pathlib.Path(tmp) |
| 291 | (root / "wallet").mkdir() |
| 292 | content = b"[]\n" |
| 293 | (root / "wallet" / "utxos.json").write_bytes(content) |
| 294 | |
| 295 | snap = plugin.snapshot(root) |
| 296 | |
| 297 | expected_hash = _sha256(content) |
| 298 | assert snap["files"]["wallet/utxos.json"] == expected_hash |
| 299 | |
| 300 | def test_snapshot_deterministic(self) -> None: |
| 301 | plugin = BitcoinPlugin() |
| 302 | with tempfile.TemporaryDirectory() as tmp: |
| 303 | root = pathlib.Path(tmp) |
| 304 | (root / "wallet").mkdir() |
| 305 | (root / "wallet" / "utxos.json").write_bytes(b"[]") |
| 306 | |
| 307 | snap1 = plugin.snapshot(root) |
| 308 | snap2 = plugin.snapshot(root) |
| 309 | |
| 310 | assert snap1["files"] == snap2["files"] |
| 311 | |
| 312 | |
| 313 | # --------------------------------------------------------------------------- |
| 314 | # diff — file-level |
| 315 | # --------------------------------------------------------------------------- |
| 316 | |
| 317 | |
| 318 | class TestDiffFileLevel: |
| 319 | def setup_method(self) -> None: |
| 320 | self.plugin = BitcoinPlugin() |
| 321 | |
| 322 | def test_added_file_produces_insert_op(self) -> None: |
| 323 | base = SnapshotManifest(files={}, domain="bitcoin") |
| 324 | target = SnapshotManifest( |
| 325 | files={"wallet/utxos.json": "a" * 64}, |
| 326 | domain="bitcoin", |
| 327 | ) |
| 328 | delta = self.plugin.diff(base, target) |
| 329 | assert len(delta["ops"]) == 1 |
| 330 | assert delta["ops"][0]["op"] == "insert" |
| 331 | assert delta["ops"][0]["address"] == "wallet/utxos.json" |
| 332 | |
| 333 | def test_removed_file_produces_delete_op(self) -> None: |
| 334 | base = SnapshotManifest( |
| 335 | files={"wallet/utxos.json": "a" * 64}, |
| 336 | domain="bitcoin", |
| 337 | ) |
| 338 | target = SnapshotManifest(files={}, domain="bitcoin") |
| 339 | delta = self.plugin.diff(base, target) |
| 340 | assert delta["ops"][0]["op"] == "delete" |
| 341 | |
| 342 | def test_unchanged_file_produces_no_ops(self) -> None: |
| 343 | files = {"wallet/utxos.json": "a" * 64} |
| 344 | base = SnapshotManifest(files=files, domain="bitcoin") |
| 345 | target = SnapshotManifest(files=files, domain="bitcoin") |
| 346 | delta = self.plugin.diff(base, target) |
| 347 | assert len(delta["ops"]) == 0 |
| 348 | assert "clean" in delta["summary"] |
| 349 | |
| 350 | def test_modified_file_without_repo_root_produces_replace_op(self) -> None: |
| 351 | base = SnapshotManifest( |
| 352 | files={"wallet/utxos.json": "a" * 64}, |
| 353 | domain="bitcoin", |
| 354 | ) |
| 355 | target = SnapshotManifest( |
| 356 | files={"wallet/utxos.json": "b" * 64}, |
| 357 | domain="bitcoin", |
| 358 | ) |
| 359 | delta = self.plugin.diff(base, target) |
| 360 | assert delta["ops"][0]["op"] == "replace" |
| 361 | |
| 362 | def test_summary_reflects_op_counts(self) -> None: |
| 363 | base = SnapshotManifest( |
| 364 | files={"wallet/utxos.json": "a" * 64, "wallet/labels.json": "b" * 64}, |
| 365 | domain="bitcoin", |
| 366 | ) |
| 367 | target = SnapshotManifest( |
| 368 | files={"wallet/utxos.json": "a" * 64, "strategy/agent.json": "c" * 64}, |
| 369 | domain="bitcoin", |
| 370 | ) |
| 371 | delta = self.plugin.diff(base, target) |
| 372 | assert "added" in delta["summary"] |
| 373 | assert "removed" in delta["summary"] |
| 374 | |
| 375 | |
| 376 | # --------------------------------------------------------------------------- |
| 377 | # diff — semantic UTXO level |
| 378 | # --------------------------------------------------------------------------- |
| 379 | |
| 380 | |
| 381 | class TestDiffUTXOs: |
| 382 | def test_new_utxo_produces_insert_op(self) -> None: |
| 383 | u = _make_utxo(txid="a" * 64, vout=0, amount_sat=500_000) |
| 384 | old = _json_bytes([]) |
| 385 | new = _json_bytes([u]) |
| 386 | ops = _diff_utxos("wallet/utxos.json", old, new) |
| 387 | assert len(ops) == 1 |
| 388 | assert ops[0]["op"] == "insert" |
| 389 | assert "a" * 64 + ":0" in ops[0]["address"] |
| 390 | assert "0.00500000 BTC" in ops[0]["content_summary"] |
| 391 | |
| 392 | def test_spent_utxo_produces_delete_op(self) -> None: |
| 393 | u = _make_utxo(txid="b" * 64, vout=1, amount_sat=200_000) |
| 394 | old = _json_bytes([u]) |
| 395 | new = _json_bytes([]) |
| 396 | ops = _diff_utxos("wallet/utxos.json", old, new) |
| 397 | assert len(ops) == 1 |
| 398 | assert ops[0]["op"] == "delete" |
| 399 | assert "spent" in ops[0]["content_summary"] |
| 400 | |
| 401 | def test_confirmation_update_produces_mutate_op(self) -> None: |
| 402 | txid = "c" * 64 |
| 403 | u_old = _make_utxo(txid=txid, vout=0, confirmations=1) |
| 404 | u_new = _make_utxo(txid=txid, vout=0, confirmations=6) |
| 405 | old = _json_bytes([u_old]) |
| 406 | new = _json_bytes([u_new]) |
| 407 | ops = _diff_utxos("wallet/utxos.json", old, new) |
| 408 | assert len(ops) == 1 |
| 409 | assert ops[0]["op"] == "mutate" |
| 410 | assert ops[0]["fields"]["confirmations"]["old"] == "1" |
| 411 | assert ops[0]["fields"]["confirmations"]["new"] == "6" |
| 412 | |
| 413 | def test_unchanged_utxos_produce_no_ops(self) -> None: |
| 414 | u = _make_utxo() |
| 415 | data = _json_bytes([u]) |
| 416 | ops = _diff_utxos("wallet/utxos.json", data, data) |
| 417 | assert ops == [] |
| 418 | |
| 419 | def test_multiple_utxos_diff(self) -> None: |
| 420 | u1 = _make_utxo(txid="d" * 64, vout=0, amount_sat=100_000) |
| 421 | u2 = _make_utxo(txid="e" * 64, vout=0, amount_sat=200_000) |
| 422 | u3 = _make_utxo(txid="f" * 64, vout=0, amount_sat=300_000) |
| 423 | old = _json_bytes([u1, u2]) |
| 424 | new = _json_bytes([u2, u3]) |
| 425 | ops = _diff_utxos("wallet/utxos.json", old, new) |
| 426 | op_types = {op["op"] for op in ops} |
| 427 | assert "insert" in op_types |
| 428 | assert "delete" in op_types |
| 429 | |
| 430 | |
| 431 | # --------------------------------------------------------------------------- |
| 432 | # diff — semantic channel level |
| 433 | # --------------------------------------------------------------------------- |
| 434 | |
| 435 | |
| 436 | class TestDiffChannels: |
| 437 | def test_new_channel_produces_insert_op(self) -> None: |
| 438 | ch = _make_channel() |
| 439 | ops = _diff_channels("channels/channels.json", _json_bytes([]), _json_bytes([ch])) |
| 440 | assert len(ops) == 1 |
| 441 | assert ops[0]["op"] == "insert" |
| 442 | assert "opened" in ops[0]["content_summary"] |
| 443 | |
| 444 | def test_closed_channel_produces_delete_op(self) -> None: |
| 445 | ch = _make_channel() |
| 446 | ops = _diff_channels("channels/channels.json", _json_bytes([ch]), _json_bytes([])) |
| 447 | assert len(ops) == 1 |
| 448 | assert ops[0]["op"] == "delete" |
| 449 | |
| 450 | def test_balance_change_produces_mutate_op(self) -> None: |
| 451 | ch_old = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=900_000) |
| 452 | ch_new = _make_channel(local_balance_sat=800_000, remote_balance_sat=1_100_000) |
| 453 | ops = _diff_channels( |
| 454 | "channels/channels.json", |
| 455 | _json_bytes([ch_old]), |
| 456 | _json_bytes([ch_new]), |
| 457 | ) |
| 458 | assert len(ops) == 1 |
| 459 | assert ops[0]["op"] == "mutate" |
| 460 | fields = ops[0]["fields"] |
| 461 | assert "local_balance_sat" in fields |
| 462 | assert "remote_balance_sat" in fields |
| 463 | |
| 464 | |
| 465 | # --------------------------------------------------------------------------- |
| 466 | # diff — strategy field level |
| 467 | # --------------------------------------------------------------------------- |
| 468 | |
| 469 | |
| 470 | class TestDiffStrategy: |
| 471 | def test_fee_rate_change_produces_mutate_op(self) -> None: |
| 472 | old_strat = _make_strategy(max_fee_rate_sat_vbyte=10) |
| 473 | new_strat = _make_strategy(max_fee_rate_sat_vbyte=50) |
| 474 | ops = _diff_strategy("strategy/agent.json", _json_bytes(old_strat), _json_bytes(new_strat)) |
| 475 | assert len(ops) == 1 |
| 476 | assert ops[0]["op"] == "mutate" |
| 477 | assert "max_fee_rate_sat_vbyte" in ops[0]["fields"] |
| 478 | |
| 479 | def test_simulation_mode_toggle_detected(self) -> None: |
| 480 | old_s = _make_strategy(simulation_mode=False) |
| 481 | new_s = _make_strategy(simulation_mode=True) |
| 482 | ops = _diff_strategy("strategy/agent.json", _json_bytes(old_s), _json_bytes(new_s)) |
| 483 | assert ops[0]["op"] == "mutate" |
| 484 | assert ops[0]["fields"]["simulation_mode"]["old"] == "False" |
| 485 | assert ops[0]["fields"]["simulation_mode"]["new"] == "True" |
| 486 | |
| 487 | def test_identical_strategy_no_ops(self) -> None: |
| 488 | s = _make_strategy() |
| 489 | data = _json_bytes(s) |
| 490 | ops = _diff_strategy("strategy/agent.json", data, data) |
| 491 | assert ops == [] |
| 492 | |
| 493 | |
| 494 | # --------------------------------------------------------------------------- |
| 495 | # diff — time series (prices, fees) |
| 496 | # --------------------------------------------------------------------------- |
| 497 | |
| 498 | |
| 499 | class TestDiffTimeSeries: |
| 500 | def test_new_price_tick_produces_insert(self) -> None: |
| 501 | tick = _make_price(timestamp=1_000, price_usd=60_000.0) |
| 502 | ops = _diff_time_series( |
| 503 | "oracles/prices.json", _json_bytes([]), _json_bytes([tick]), "prices" |
| 504 | ) |
| 505 | assert len(ops) == 1 |
| 506 | assert ops[0]["op"] == "insert" |
| 507 | assert "$60,000.00" in ops[0]["content_summary"] |
| 508 | |
| 509 | def test_new_fee_estimate_produces_insert(self) -> None: |
| 510 | fee = _make_fee(timestamp=2_000, t1=25, t6=12, t144=2) |
| 511 | ops = _diff_time_series( |
| 512 | "oracles/fees.json", _json_bytes([]), _json_bytes([fee]), "fees" |
| 513 | ) |
| 514 | assert len(ops) == 1 |
| 515 | assert ops[0]["op"] == "insert" |
| 516 | assert "25" in ops[0]["content_summary"] |
| 517 | |
| 518 | def test_existing_tick_not_duplicated(self) -> None: |
| 519 | tick = _make_price(timestamp=3_000) |
| 520 | data = _json_bytes([tick]) |
| 521 | ops = _diff_time_series("oracles/prices.json", data, data, "prices") |
| 522 | assert ops == [] |
| 523 | |
| 524 | |
| 525 | # --------------------------------------------------------------------------- |
| 526 | # diff with repo_root — produces PatchOp |
| 527 | # --------------------------------------------------------------------------- |
| 528 | |
| 529 | |
| 530 | class TestDiffWithRepoRoot: |
| 531 | def test_modified_utxos_json_produces_patch_op(self) -> None: |
| 532 | plugin = BitcoinPlugin() |
| 533 | u_old = _make_utxo(txid="a" * 64, vout=0, amount_sat=100_000) |
| 534 | u_new = _make_utxo(txid="b" * 64, vout=0, amount_sat=200_000) |
| 535 | old_bytes = _json_bytes([u_old]) |
| 536 | new_bytes = _json_bytes([u_new]) |
| 537 | old_hash = _sha256(old_bytes) |
| 538 | new_hash = _sha256(new_bytes) |
| 539 | |
| 540 | with tempfile.TemporaryDirectory() as tmp: |
| 541 | root = pathlib.Path(tmp) |
| 542 | muse_dir = root / ".muse" / "objects" |
| 543 | muse_dir.mkdir(parents=True) |
| 544 | # Write blobs in sharded layout |
| 545 | (muse_dir / old_hash[:2]).mkdir(exist_ok=True) |
| 546 | (muse_dir / old_hash[:2] / old_hash[2:]).write_bytes(old_bytes) |
| 547 | (muse_dir / new_hash[:2]).mkdir(exist_ok=True) |
| 548 | (muse_dir / new_hash[:2] / new_hash[2:]).write_bytes(new_bytes) |
| 549 | |
| 550 | base = SnapshotManifest( |
| 551 | files={"wallet/utxos.json": old_hash}, domain="bitcoin" |
| 552 | ) |
| 553 | target = SnapshotManifest( |
| 554 | files={"wallet/utxos.json": new_hash}, domain="bitcoin" |
| 555 | ) |
| 556 | delta = plugin.diff(base, target, repo_root=root) |
| 557 | |
| 558 | assert delta["ops"][0]["op"] == "patch" |
| 559 | patch = delta["ops"][0] |
| 560 | assert patch["child_domain"] == "bitcoin.utxos" |
| 561 | child_ops = patch["child_ops"] |
| 562 | op_types = {op["op"] for op in child_ops} |
| 563 | assert "insert" in op_types |
| 564 | assert "delete" in op_types |
| 565 | |
| 566 | def test_fallback_to_replace_when_object_missing(self) -> None: |
| 567 | plugin = BitcoinPlugin() |
| 568 | base = SnapshotManifest( |
| 569 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 570 | ) |
| 571 | target = SnapshotManifest( |
| 572 | files={"wallet/utxos.json": "b" * 64}, domain="bitcoin" |
| 573 | ) |
| 574 | with tempfile.TemporaryDirectory() as tmp: |
| 575 | root = pathlib.Path(tmp) |
| 576 | (root / ".muse" / "objects").mkdir(parents=True) |
| 577 | delta = plugin.diff(base, target, repo_root=root) |
| 578 | |
| 579 | assert delta["ops"][0]["op"] == "replace" |
| 580 | |
| 581 | |
| 582 | # --------------------------------------------------------------------------- |
| 583 | # merge |
| 584 | # --------------------------------------------------------------------------- |
| 585 | |
| 586 | |
| 587 | class TestMerge: |
| 588 | def setup_method(self) -> None: |
| 589 | self.plugin = BitcoinPlugin() |
| 590 | |
| 591 | def test_clean_merge_no_conflicts(self) -> None: |
| 592 | base = SnapshotManifest(files={}, domain="bitcoin") |
| 593 | left = SnapshotManifest( |
| 594 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 595 | ) |
| 596 | right = SnapshotManifest( |
| 597 | files={"channels/channels.json": "b" * 64}, domain="bitcoin" |
| 598 | ) |
| 599 | result = self.plugin.merge(base, left, right) |
| 600 | assert result.is_clean |
| 601 | assert "wallet/utxos.json" in result.merged["files"] |
| 602 | assert "channels/channels.json" in result.merged["files"] |
| 603 | |
| 604 | def test_both_sides_unchanged_no_conflict(self) -> None: |
| 605 | files = {"wallet/utxos.json": "a" * 64} |
| 606 | base = SnapshotManifest(files=files, domain="bitcoin") |
| 607 | left = SnapshotManifest(files=files, domain="bitcoin") |
| 608 | right = SnapshotManifest(files=files, domain="bitcoin") |
| 609 | result = self.plugin.merge(base, left, right) |
| 610 | assert result.is_clean |
| 611 | |
| 612 | def test_only_left_changed_takes_left(self) -> None: |
| 613 | base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 614 | left = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin") |
| 615 | right = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 616 | result = self.plugin.merge(base, left, right) |
| 617 | assert result.is_clean |
| 618 | assert result.merged["files"]["f"] == "b" * 64 |
| 619 | |
| 620 | def test_only_right_changed_takes_right(self) -> None: |
| 621 | base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 622 | left = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 623 | right = SnapshotManifest(files={"f": "c" * 64}, domain="bitcoin") |
| 624 | result = self.plugin.merge(base, left, right) |
| 625 | assert result.is_clean |
| 626 | assert result.merged["files"]["f"] == "c" * 64 |
| 627 | |
| 628 | def test_both_changed_differently_produces_conflict(self) -> None: |
| 629 | base = SnapshotManifest(files={"wallet/utxos.json": "a" * 64}, domain="bitcoin") |
| 630 | left = SnapshotManifest(files={"wallet/utxos.json": "b" * 64}, domain="bitcoin") |
| 631 | right = SnapshotManifest(files={"wallet/utxos.json": "c" * 64}, domain="bitcoin") |
| 632 | result = self.plugin.merge(base, left, right) |
| 633 | assert not result.is_clean |
| 634 | assert "wallet/utxos.json" in result.conflicts |
| 635 | |
| 636 | def test_double_spend_detection_in_merge(self) -> None: |
| 637 | u_base = _make_utxo(txid="d" * 64, vout=0, amount_sat=1_000_000) |
| 638 | # Left spent the base UTXO and got change at a different address |
| 639 | u_left_change = _make_utxo(txid="e" * 64, vout=0, amount_sat=900_000, address="bc1qleft") |
| 640 | # Right spent the SAME base UTXO in a different tx → double-spend |
| 641 | u_right_change = _make_utxo(txid="f" * 64, vout=0, amount_sat=850_000, address="bc1qright") |
| 642 | base_bytes = _json_bytes([u_base]) |
| 643 | left_bytes = _json_bytes([u_left_change]) # left spent base UTXO, got change |
| 644 | right_bytes = _json_bytes([u_right_change]) # right also spent it → double-spend! |
| 645 | |
| 646 | with tempfile.TemporaryDirectory() as tmp: |
| 647 | root = pathlib.Path(tmp) |
| 648 | muse_dir = root / ".muse" / "objects" |
| 649 | muse_dir.mkdir(parents=True) |
| 650 | |
| 651 | for data in (base_bytes, left_bytes, right_bytes): |
| 652 | h = _sha256(data) |
| 653 | shard = muse_dir / h[:2] |
| 654 | shard.mkdir(exist_ok=True) |
| 655 | (shard / h[2:]).write_bytes(data) |
| 656 | |
| 657 | base = SnapshotManifest( |
| 658 | files={"wallet/utxos.json": _sha256(base_bytes)}, domain="bitcoin" |
| 659 | ) |
| 660 | left = SnapshotManifest( |
| 661 | files={"wallet/utxos.json": _sha256(left_bytes)}, domain="bitcoin" |
| 662 | ) |
| 663 | right = SnapshotManifest( |
| 664 | files={"wallet/utxos.json": _sha256(right_bytes)}, domain="bitcoin" |
| 665 | ) |
| 666 | result = self.plugin.merge(base, left, right, repo_root=root) |
| 667 | |
| 668 | # Both sides deleted the same UTXO from the same base → double-spend |
| 669 | double_spend_records = [ |
| 670 | cr for cr in result.conflict_records |
| 671 | if cr.conflict_type == "double_spend" |
| 672 | ] |
| 673 | assert len(double_spend_records) >= 1 |
| 674 | |
| 675 | def test_both_deleted_same_file_is_clean(self) -> None: |
| 676 | base = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 677 | left = SnapshotManifest(files={}, domain="bitcoin") |
| 678 | right = SnapshotManifest(files={}, domain="bitcoin") |
| 679 | result = self.plugin.merge(base, left, right) |
| 680 | assert result.is_clean |
| 681 | assert "f" not in result.merged["files"] |
| 682 | |
| 683 | |
| 684 | # --------------------------------------------------------------------------- |
| 685 | # drift |
| 686 | # --------------------------------------------------------------------------- |
| 687 | |
| 688 | |
| 689 | class TestDrift: |
| 690 | def test_no_drift_when_identical(self) -> None: |
| 691 | plugin = BitcoinPlugin() |
| 692 | snap = SnapshotManifest( |
| 693 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 694 | ) |
| 695 | report = plugin.drift(snap, snap) |
| 696 | assert not report.has_drift |
| 697 | |
| 698 | def test_drift_detected_when_file_added(self) -> None: |
| 699 | plugin = BitcoinPlugin() |
| 700 | committed = SnapshotManifest(files={}, domain="bitcoin") |
| 701 | live = SnapshotManifest( |
| 702 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 703 | ) |
| 704 | report = plugin.drift(committed, live) |
| 705 | assert report.has_drift |
| 706 | assert "added" in report.summary |
| 707 | |
| 708 | def test_drift_from_disk_detected(self) -> None: |
| 709 | plugin = BitcoinPlugin() |
| 710 | with tempfile.TemporaryDirectory() as tmp: |
| 711 | root = pathlib.Path(tmp) |
| 712 | (root / "wallet").mkdir() |
| 713 | (root / "wallet" / "utxos.json").write_bytes(b"[]") |
| 714 | |
| 715 | committed = SnapshotManifest(files={}, domain="bitcoin") |
| 716 | report = plugin.drift(committed, root) |
| 717 | |
| 718 | assert report.has_drift |
| 719 | |
| 720 | |
| 721 | # --------------------------------------------------------------------------- |
| 722 | # apply |
| 723 | # --------------------------------------------------------------------------- |
| 724 | |
| 725 | |
| 726 | class TestApply: |
| 727 | def test_apply_is_passthrough(self) -> None: |
| 728 | plugin = BitcoinPlugin() |
| 729 | delta = plugin.diff( |
| 730 | SnapshotManifest(files={}, domain="bitcoin"), |
| 731 | SnapshotManifest(files={}, domain="bitcoin"), |
| 732 | ) |
| 733 | snap = SnapshotManifest(files={}, domain="bitcoin") |
| 734 | result = plugin.apply(delta, snap) |
| 735 | assert result is snap |
| 736 | |
| 737 | |
| 738 | # --------------------------------------------------------------------------- |
| 739 | # schema |
| 740 | # --------------------------------------------------------------------------- |
| 741 | |
| 742 | |
| 743 | class TestSchema: |
| 744 | def test_schema_returns_domain_schema(self) -> None: |
| 745 | from muse.core.schema import DomainSchema |
| 746 | s = BitcoinPlugin().schema() |
| 747 | assert isinstance(s, dict) |
| 748 | assert s["domain"] == "bitcoin" |
| 749 | assert s["schema_version"] == __version__ |
| 750 | |
| 751 | def test_schema_has_ten_dimensions(self) -> None: |
| 752 | s = BitcoinPlugin().schema() |
| 753 | assert len(s["dimensions"]) == 10 |
| 754 | |
| 755 | def test_schema_merge_mode_is_crdt(self) -> None: |
| 756 | s = BitcoinPlugin().schema() |
| 757 | assert s["merge_mode"] == "crdt" |
| 758 | |
| 759 | def test_dimension_names(self) -> None: |
| 760 | s = BitcoinPlugin().schema() |
| 761 | names = {d["name"] for d in s["dimensions"]} |
| 762 | expected = { |
| 763 | "utxos", "transactions", "labels", "descriptors", |
| 764 | "channels", "strategy", "oracle_prices", "oracle_fees", |
| 765 | "network", "execution", |
| 766 | } |
| 767 | assert names == expected |
| 768 | |
| 769 | def test_strategy_dimension_is_not_independent(self) -> None: |
| 770 | s = BitcoinPlugin().schema() |
| 771 | strat = next(d for d in s["dimensions"] if d["name"] == "strategy") |
| 772 | assert strat["independent_merge"] is False |
| 773 | |
| 774 | |
| 775 | # --------------------------------------------------------------------------- |
| 776 | # OT merge — StructuredMergePlugin |
| 777 | # --------------------------------------------------------------------------- |
| 778 | |
| 779 | |
| 780 | class TestMergeOps: |
| 781 | def setup_method(self) -> None: |
| 782 | self.plugin = BitcoinPlugin() |
| 783 | self.base = SnapshotManifest(files={}, domain="bitcoin") |
| 784 | self.snap = SnapshotManifest(files={}, domain="bitcoin") |
| 785 | |
| 786 | def test_non_conflicting_ops_clean_merge(self) -> None: |
| 787 | ours_ops: list[DomainOp] = [ |
| 788 | InsertOp( |
| 789 | op="insert", |
| 790 | address="wallet/utxos.json::aaa:0", |
| 791 | position=None, |
| 792 | content_id="a" * 64, |
| 793 | content_summary="received UTXO aaa:0", |
| 794 | ) |
| 795 | ] |
| 796 | theirs_ops: list[DomainOp] = [ |
| 797 | InsertOp( |
| 798 | op="insert", |
| 799 | address="wallet/utxos.json::bbb:0", |
| 800 | position=None, |
| 801 | content_id="b" * 64, |
| 802 | content_summary="received UTXO bbb:0", |
| 803 | ) |
| 804 | ] |
| 805 | result = self.plugin.merge_ops( |
| 806 | self.base, self.snap, self.snap, ours_ops, theirs_ops |
| 807 | ) |
| 808 | # Different addresses → no double-spend, clean merge |
| 809 | assert len(result.conflict_records) == 0 |
| 810 | |
| 811 | def test_double_spend_detected_in_merge_ops(self) -> None: |
| 812 | utxo_addr = "wallet/utxos.json::deadbeef:0" |
| 813 | ours_ops: list[DomainOp] = [ |
| 814 | DeleteOp( |
| 815 | op="delete", |
| 816 | address=utxo_addr, |
| 817 | position=None, |
| 818 | content_id="a" * 64, |
| 819 | content_summary="spent UTXO deadbeef:0", |
| 820 | ) |
| 821 | ] |
| 822 | theirs_ops: list[DomainOp] = [ |
| 823 | DeleteOp( |
| 824 | op="delete", |
| 825 | address=utxo_addr, |
| 826 | position=None, |
| 827 | content_id="a" * 64, |
| 828 | content_summary="spent UTXO deadbeef:0", |
| 829 | ) |
| 830 | ] |
| 831 | result = self.plugin.merge_ops( |
| 832 | self.base, self.snap, self.snap, ours_ops, theirs_ops |
| 833 | ) |
| 834 | double_spends = [ |
| 835 | cr for cr in result.conflict_records |
| 836 | if cr.conflict_type == "double_spend" |
| 837 | ] |
| 838 | assert len(double_spends) == 1 |
| 839 | assert "deadbeef:0" in double_spends[0].addresses[0] |
| 840 | |
| 841 | def test_non_utxo_delete_not_flagged_as_double_spend(self) -> None: |
| 842 | addr = "channels/channels.json::850000x1x0" |
| 843 | ours_ops: list[DomainOp] = [ |
| 844 | DeleteOp(op="delete", address=addr, position=None, |
| 845 | content_id="a" * 64, content_summary="channel closed") |
| 846 | ] |
| 847 | theirs_ops: list[DomainOp] = [ |
| 848 | DeleteOp(op="delete", address=addr, position=None, |
| 849 | content_id="a" * 64, content_summary="channel closed") |
| 850 | ] |
| 851 | result = self.plugin.merge_ops( |
| 852 | self.base, self.snap, self.snap, ours_ops, theirs_ops |
| 853 | ) |
| 854 | double_spends = [ |
| 855 | cr for cr in result.conflict_records |
| 856 | if cr.conflict_type == "double_spend" |
| 857 | ] |
| 858 | assert len(double_spends) == 0 |
| 859 | |
| 860 | |
| 861 | # --------------------------------------------------------------------------- |
| 862 | # CRDTPlugin |
| 863 | # --------------------------------------------------------------------------- |
| 864 | |
| 865 | |
| 866 | class TestCRDT: |
| 867 | def setup_method(self) -> None: |
| 868 | self.plugin = BitcoinPlugin() |
| 869 | self.base_snap = SnapshotManifest( |
| 870 | files={"wallet/utxos.json": "a" * 64}, |
| 871 | domain="bitcoin", |
| 872 | ) |
| 873 | |
| 874 | def test_to_crdt_state_preserves_files(self) -> None: |
| 875 | crdt = self.plugin.to_crdt_state(self.base_snap) |
| 876 | assert crdt["domain"] == "bitcoin" |
| 877 | assert crdt["schema_version"] == __version__ |
| 878 | assert "wallet/utxos.json" in crdt["files"] |
| 879 | |
| 880 | def test_from_crdt_state_returns_plain_snapshot(self) -> None: |
| 881 | crdt = self.plugin.to_crdt_state(self.base_snap) |
| 882 | snap = self.plugin.from_crdt_state(crdt) |
| 883 | assert snap["domain"] == "bitcoin" |
| 884 | assert "wallet/utxos.json" in snap["files"] |
| 885 | |
| 886 | def test_join_idempotent(self) -> None: |
| 887 | crdt = self.plugin.to_crdt_state(self.base_snap) |
| 888 | joined = self.plugin.join(crdt, crdt) |
| 889 | result = self.plugin.from_crdt_state(joined) |
| 890 | original = self.plugin.from_crdt_state(crdt) |
| 891 | assert result["files"] == original["files"] |
| 892 | |
| 893 | def test_join_commutative(self) -> None: |
| 894 | snap_a = SnapshotManifest( |
| 895 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 896 | ) |
| 897 | snap_b = SnapshotManifest( |
| 898 | files={"channels/channels.json": "b" * 64}, domain="bitcoin" |
| 899 | ) |
| 900 | a = self.plugin.to_crdt_state(snap_a) |
| 901 | b = self.plugin.to_crdt_state(snap_b) |
| 902 | ab = self.plugin.from_crdt_state(self.plugin.join(a, b)) |
| 903 | ba = self.plugin.from_crdt_state(self.plugin.join(b, a)) |
| 904 | assert ab["files"] == ba["files"] |
| 905 | |
| 906 | def test_join_associative(self) -> None: |
| 907 | snap_a = SnapshotManifest(files={"f1": "a" * 64}, domain="bitcoin") |
| 908 | snap_b = SnapshotManifest(files={"f2": "b" * 64}, domain="bitcoin") |
| 909 | snap_c = SnapshotManifest(files={"f3": "c" * 64}, domain="bitcoin") |
| 910 | a = self.plugin.to_crdt_state(snap_a) |
| 911 | b = self.plugin.to_crdt_state(snap_b) |
| 912 | c = self.plugin.to_crdt_state(snap_c) |
| 913 | ab_c = self.plugin.from_crdt_state( |
| 914 | self.plugin.join(self.plugin.join(a, b), c) |
| 915 | ) |
| 916 | a_bc = self.plugin.from_crdt_state( |
| 917 | self.plugin.join(a, self.plugin.join(b, c)) |
| 918 | ) |
| 919 | assert ab_c["files"] == a_bc["files"] |
| 920 | |
| 921 | def test_join_preserves_both_agents_files(self) -> None: |
| 922 | snap_a = SnapshotManifest( |
| 923 | files={"wallet/utxos.json": "a" * 64}, domain="bitcoin" |
| 924 | ) |
| 925 | snap_b = SnapshotManifest( |
| 926 | files={"channels/channels.json": "b" * 64}, domain="bitcoin" |
| 927 | ) |
| 928 | a = self.plugin.to_crdt_state(snap_a) |
| 929 | b = self.plugin.to_crdt_state(snap_b) |
| 930 | joined = self.plugin.from_crdt_state(self.plugin.join(a, b)) |
| 931 | assert "wallet/utxos.json" in joined["files"] |
| 932 | assert "channels/channels.json" in joined["files"] |
| 933 | |
| 934 | def test_crdt_schema_has_seven_dimensions(self) -> None: |
| 935 | dims = self.plugin.crdt_schema() |
| 936 | assert len(dims) == 7 |
| 937 | |
| 938 | def test_crdt_schema_dimension_types(self) -> None: |
| 939 | dims = self.plugin.crdt_schema() |
| 940 | types = {d["crdt_type"] for d in dims} |
| 941 | assert "aw_map" in types |
| 942 | assert "or_set" in types |
| 943 | |
| 944 | def test_vector_clock_advances_on_join(self) -> None: |
| 945 | snap_a = SnapshotManifest(files={"f": "a" * 64}, domain="bitcoin") |
| 946 | snap_b = SnapshotManifest(files={"f": "b" * 64}, domain="bitcoin") |
| 947 | a = self.plugin.to_crdt_state(snap_a) |
| 948 | b = self.plugin.to_crdt_state(snap_b) |
| 949 | joined = self.plugin.join(a, b) |
| 950 | assert isinstance(joined["vclock"], dict) |
| 951 | |
| 952 | |
| 953 | # --------------------------------------------------------------------------- |
| 954 | # Query analytics |
| 955 | # --------------------------------------------------------------------------- |
| 956 | |
| 957 | |
| 958 | class TestQueryAnalytics: |
| 959 | def test_total_balance_sat(self) -> None: |
| 960 | utxos = [ |
| 961 | _make_utxo(amount_sat=100_000), |
| 962 | _make_utxo(txid="b" * 64, amount_sat=200_000), |
| 963 | ] |
| 964 | assert total_balance_sat(utxos) == 300_000 |
| 965 | |
| 966 | def test_confirmed_balance_excludes_unconfirmed(self) -> None: |
| 967 | utxos = [ |
| 968 | _make_utxo(amount_sat=100_000, confirmations=6), |
| 969 | _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0), |
| 970 | ] |
| 971 | assert confirmed_balance_sat(utxos) == 100_000 |
| 972 | |
| 973 | def test_confirmed_balance_excludes_immature_coinbase(self) -> None: |
| 974 | utxos = [ |
| 975 | _make_utxo(amount_sat=625_000_000, confirmations=50, coinbase=True), |
| 976 | _make_utxo(txid="c" * 64, amount_sat=100_000, confirmations=1), |
| 977 | ] |
| 978 | assert confirmed_balance_sat(utxos) == 100_000 |
| 979 | |
| 980 | def test_balance_by_script_type(self) -> None: |
| 981 | utxos = [ |
| 982 | _make_utxo(amount_sat=100_000, script_type="p2wpkh"), |
| 983 | _make_utxo(txid="b" * 64, amount_sat=200_000, script_type="p2tr"), |
| 984 | _make_utxo(txid="c" * 64, amount_sat=50_000, script_type="p2wpkh"), |
| 985 | ] |
| 986 | breakdown = balance_by_script_type(utxos) |
| 987 | assert breakdown["p2wpkh"] == 150_000 |
| 988 | assert breakdown["p2tr"] == 200_000 |
| 989 | |
| 990 | def test_coin_age_blocks(self) -> None: |
| 991 | u = _make_utxo(block_height=800_000) |
| 992 | assert coin_age_blocks(u, 850_000) == 50_000 |
| 993 | |
| 994 | def test_coin_age_blocks_unconfirmed(self) -> None: |
| 995 | u = _make_utxo(block_height=None) |
| 996 | assert coin_age_blocks(u, 850_000) is None |
| 997 | |
| 998 | def test_format_sat_small(self) -> None: |
| 999 | assert format_sat(1_000) == "1,000 sats" |
| 1000 | |
| 1001 | def test_format_sat_large(self) -> None: |
| 1002 | result = format_sat(100_000_000) |
| 1003 | assert "1.00000000 BTC" in result |
| 1004 | |
| 1005 | def test_utxo_key(self) -> None: |
| 1006 | u = _make_utxo(txid="abc", vout=3) |
| 1007 | assert utxo_key(u) == "abc:3" |
| 1008 | |
| 1009 | def test_double_spend_candidates_detects_concurrent_spends(self) -> None: |
| 1010 | base = {"aaa:0", "bbb:1", "ccc:2"} |
| 1011 | our_spent = {"aaa:0", "bbb:1"} |
| 1012 | their_spent = {"aaa:0", "ccc:2"} |
| 1013 | candidates = double_spend_candidates(base, our_spent, their_spent) |
| 1014 | assert candidates == ["aaa:0"] |
| 1015 | |
| 1016 | def test_double_spend_candidates_no_overlap(self) -> None: |
| 1017 | base = {"aaa:0", "bbb:1"} |
| 1018 | our_spent = {"aaa:0"} |
| 1019 | their_spent = {"bbb:1"} |
| 1020 | assert double_spend_candidates(base, our_spent, their_spent) == [] |
| 1021 | |
| 1022 | def test_channel_liquidity_totals(self) -> None: |
| 1023 | ch1 = _make_channel(local_balance_sat=1_000_000, remote_balance_sat=500_000) |
| 1024 | ch2 = _make_channel( |
| 1025 | channel_id="x", local_balance_sat=200_000, remote_balance_sat=800_000 |
| 1026 | ) |
| 1027 | local, remote = channel_liquidity_totals([ch1, ch2]) |
| 1028 | assert local == 1_200_000 |
| 1029 | assert remote == 1_300_000 |
| 1030 | |
| 1031 | def test_channel_utilization(self) -> None: |
| 1032 | ch = _make_channel( |
| 1033 | capacity_sat=2_000_000, |
| 1034 | local_balance_sat=800_000, |
| 1035 | local_reserve_sat=20_000, |
| 1036 | remote_reserve_sat=20_000, |
| 1037 | ) |
| 1038 | util = channel_utilization(ch) |
| 1039 | assert 0.0 <= util <= 1.0 |
| 1040 | |
| 1041 | def test_fee_surface_str(self) -> None: |
| 1042 | est = _make_fee(t1=42, t6=15, t144=3) |
| 1043 | s = fee_surface_str(est) |
| 1044 | assert "42" in s |
| 1045 | assert "15" in s |
| 1046 | assert "3" in s |
| 1047 | assert "sat/vbyte" in s |
| 1048 | |
| 1049 | def test_latest_fee_estimate(self) -> None: |
| 1050 | old = _make_fee(timestamp=1_000) |
| 1051 | new = _make_fee(timestamp=2_000, t1=50) |
| 1052 | result = latest_fee_estimate([old, new]) |
| 1053 | assert result is not None |
| 1054 | assert result["timestamp"] == 2_000 |
| 1055 | |
| 1056 | def test_latest_fee_estimate_empty(self) -> None: |
| 1057 | assert latest_fee_estimate([]) is None |
| 1058 | |
| 1059 | def test_latest_price(self) -> None: |
| 1060 | p1 = _make_price(timestamp=1_000, price_usd=50_000.0) |
| 1061 | p2 = _make_price(timestamp=2_000, price_usd=70_000.0) |
| 1062 | assert latest_price([p1, p2]) == 70_000.0 |
| 1063 | |
| 1064 | def test_strategy_summary_line_simulation_mode(self) -> None: |
| 1065 | s = _make_strategy(simulation_mode=True, name="aggressive") |
| 1066 | line = strategy_summary_line(s) |
| 1067 | assert "SIM" in line |
| 1068 | assert "aggressive" in line |
| 1069 | |
| 1070 | def test_utxo_summary_line(self) -> None: |
| 1071 | utxos = [ |
| 1072 | _make_utxo(amount_sat=100_000, confirmations=6), |
| 1073 | _make_utxo(txid="b" * 64, amount_sat=50_000, confirmations=0), |
| 1074 | ] |
| 1075 | line = utxo_summary_line(utxos) |
| 1076 | assert "2 UTXOs" in line |
| 1077 | |
| 1078 | def test_mempool_summary_empty(self) -> None: |
| 1079 | line = mempool_summary_line([]) |
| 1080 | assert "empty" in line |
| 1081 | |
| 1082 | |
| 1083 | # --------------------------------------------------------------------------- |
| 1084 | # Handler routing |
| 1085 | # --------------------------------------------------------------------------- |
| 1086 | |
| 1087 | |
| 1088 | class TestHandlerRouting: |
| 1089 | def test_utxos_json_routed(self) -> None: |
| 1090 | assert _handler_for_path("wallet/utxos.json") == "utxos" |
| 1091 | |
| 1092 | def test_channels_json_routed(self) -> None: |
| 1093 | assert _handler_for_path("channels/channels.json") == "channels" |
| 1094 | |
| 1095 | def test_agent_json_routed(self) -> None: |
| 1096 | assert _handler_for_path("strategy/agent.json") == "strategy" |
| 1097 | |
| 1098 | def test_prices_json_routed(self) -> None: |
| 1099 | assert _handler_for_path("oracles/prices.json") == "prices" |
| 1100 | |
| 1101 | def test_unknown_file_returns_none(self) -> None: |
| 1102 | assert _handler_for_path("README.md") is None |
| 1103 | assert _handler_for_path("custom/data.bin") is None |
| 1104 | |
| 1105 | |
| 1106 | # --------------------------------------------------------------------------- |
| 1107 | # Registry registration |
| 1108 | # --------------------------------------------------------------------------- |
| 1109 | |
| 1110 | |
| 1111 | class TestRegistry: |
| 1112 | def test_bitcoin_registered_in_plugin_registry(self) -> None: |
| 1113 | from muse.plugins.registry import registered_domains |
| 1114 | assert "bitcoin" in registered_domains() |
| 1115 | |
| 1116 | def test_resolve_bitcoin_plugin(self) -> None: |
| 1117 | import json as _json |
| 1118 | from muse.plugins.registry import resolve_plugin |
| 1119 | |
| 1120 | with tempfile.TemporaryDirectory() as tmp: |
| 1121 | root = pathlib.Path(tmp) |
| 1122 | (root / ".muse").mkdir() |
| 1123 | (root / ".muse" / "repo.json").write_text( |
| 1124 | _json.dumps({"domain": "bitcoin"}) |
| 1125 | ) |
| 1126 | plugin = resolve_plugin(root) |
| 1127 | |
| 1128 | assert isinstance(plugin, BitcoinPlugin) |