test_op_log.py
python
| 1 | """Tests for muse.core.op_log — OpEntry, OpLogCheckpoint, OpLog.""" |
| 2 | |
| 3 | import pathlib |
| 4 | |
| 5 | import pytest |
| 6 | |
| 7 | from muse.core.op_log import ( |
| 8 | OpEntry, |
| 9 | OpLog, |
| 10 | list_sessions, |
| 11 | make_op_entry, |
| 12 | ) |
| 13 | from muse.domain import InsertOp |
| 14 | |
| 15 | |
| 16 | # --------------------------------------------------------------------------- |
| 17 | # make_op_entry factory |
| 18 | # --------------------------------------------------------------------------- |
| 19 | |
| 20 | |
| 21 | class TestMakeOpEntry: |
| 22 | def test_all_required_fields_present(self) -> None: |
| 23 | op = InsertOp( |
| 24 | op="insert", |
| 25 | address="note:0", |
| 26 | position=0, |
| 27 | content_id="abc123", |
| 28 | content_summary="C4", |
| 29 | ) |
| 30 | entry = make_op_entry( |
| 31 | actor_id="agent-x", |
| 32 | domain="midi", |
| 33 | domain_op=op, |
| 34 | lamport_ts=1, |
| 35 | ) |
| 36 | assert entry["actor_id"] == "agent-x" |
| 37 | assert entry["domain"] == "midi" |
| 38 | assert entry["lamport_ts"] == 1 |
| 39 | assert entry["parent_op_ids"] == [] |
| 40 | assert entry["intent_id"] == "" |
| 41 | assert entry["reservation_id"] == "" |
| 42 | assert len(entry["op_id"]) == 36 # UUID4 |
| 43 | |
| 44 | def test_parent_op_ids_are_copied(self) -> None: |
| 45 | op = InsertOp(op="insert", address="note:0", position=0, content_id="x", content_summary="") |
| 46 | parent_ids = ["aaa", "bbb"] |
| 47 | entry = make_op_entry("a", "midi", op, 1, parent_op_ids=parent_ids) |
| 48 | assert entry["parent_op_ids"] == ["aaa", "bbb"] |
| 49 | # Mutating the original should not affect the entry. |
| 50 | parent_ids.append("ccc") |
| 51 | assert entry["parent_op_ids"] == ["aaa", "bbb"] |
| 52 | |
| 53 | def test_op_ids_are_unique(self) -> None: |
| 54 | op = InsertOp(op="insert", address="note:0", position=0, content_id="x", content_summary="") |
| 55 | ids = {make_op_entry("a", "midi", op, i)["op_id"] for i in range(20)} |
| 56 | assert len(ids) == 20 |
| 57 | |
| 58 | |
| 59 | # --------------------------------------------------------------------------- |
| 60 | # OpLog.append and read_all |
| 61 | # --------------------------------------------------------------------------- |
| 62 | |
| 63 | |
| 64 | class TestOpLogAppendRead: |
| 65 | def test_append_and_read_all_roundtrip(self, tmp_path: pathlib.Path) -> None: |
| 66 | log = OpLog(tmp_path, "session-1") |
| 67 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c1", content_summary="C4") |
| 68 | e1 = make_op_entry("agent-a", "midi", op, 1) |
| 69 | e2 = make_op_entry("agent-a", "midi", op, 2) |
| 70 | log.append(e1) |
| 71 | log.append(e2) |
| 72 | entries = log.read_all() |
| 73 | assert len(entries) == 2 |
| 74 | assert entries[0]["op_id"] == e1["op_id"] |
| 75 | assert entries[1]["op_id"] == e2["op_id"] |
| 76 | |
| 77 | def test_empty_log_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 78 | log = OpLog(tmp_path, "empty-session") |
| 79 | assert log.read_all() == [] |
| 80 | |
| 81 | def test_append_creates_directory(self, tmp_path: pathlib.Path) -> None: |
| 82 | log = OpLog(tmp_path, "new-session") |
| 83 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c1", content_summary="") |
| 84 | log.append(make_op_entry("a", "midi", op, 1)) |
| 85 | assert (tmp_path / ".muse" / "op_log" / "new-session").is_dir() |
| 86 | |
| 87 | |
| 88 | # --------------------------------------------------------------------------- |
| 89 | # Lamport timestamp counter |
| 90 | # --------------------------------------------------------------------------- |
| 91 | |
| 92 | |
| 93 | class TestLamportTs: |
| 94 | def test_lamport_is_monotonic(self, tmp_path: pathlib.Path) -> None: |
| 95 | log = OpLog(tmp_path, "ts-session") |
| 96 | ts_values = [log.next_lamport_ts() for _ in range(10)] |
| 97 | assert ts_values == sorted(ts_values) |
| 98 | assert len(set(ts_values)) == 10 |
| 99 | |
| 100 | def test_lamport_continues_after_reopen(self, tmp_path: pathlib.Path) -> None: |
| 101 | log1 = OpLog(tmp_path, "reopen-session") |
| 102 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 103 | for i in range(5): |
| 104 | ts = log1.next_lamport_ts() |
| 105 | log1.append(make_op_entry("a", "midi", op, ts)) |
| 106 | |
| 107 | # Reopen the same session. |
| 108 | log2 = OpLog(tmp_path, "reopen-session") |
| 109 | new_ts = log2.next_lamport_ts() |
| 110 | assert new_ts > 5 |
| 111 | |
| 112 | |
| 113 | # --------------------------------------------------------------------------- |
| 114 | # Checkpoint |
| 115 | # --------------------------------------------------------------------------- |
| 116 | |
| 117 | |
| 118 | class TestCheckpoint: |
| 119 | def test_checkpoint_written_and_readable(self, tmp_path: pathlib.Path) -> None: |
| 120 | log = OpLog(tmp_path, "ckpt-session") |
| 121 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 122 | for i in range(3): |
| 123 | log.append(make_op_entry("a", "midi", op, i + 1)) |
| 124 | |
| 125 | ckpt = log.checkpoint("snap-abc") |
| 126 | assert ckpt["snapshot_id"] == "snap-abc" |
| 127 | assert ckpt["op_count"] == 3 |
| 128 | assert ckpt["lamport_ts"] == 3 |
| 129 | |
| 130 | recovered = log.read_checkpoint() |
| 131 | assert recovered is not None |
| 132 | assert recovered["snapshot_id"] == "snap-abc" |
| 133 | |
| 134 | def test_no_checkpoint_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 135 | log = OpLog(tmp_path, "no-ckpt-session") |
| 136 | assert log.read_checkpoint() is None |
| 137 | |
| 138 | def test_replay_since_checkpoint_returns_newer_only(self, tmp_path: pathlib.Path) -> None: |
| 139 | log = OpLog(tmp_path, "replay-session") |
| 140 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 141 | |
| 142 | for i in range(3): |
| 143 | log.append(make_op_entry("a", "midi", op, i + 1)) |
| 144 | log.checkpoint("snap-1") |
| 145 | |
| 146 | # Add more entries after checkpoint. |
| 147 | for i in range(3, 6): |
| 148 | log.append(make_op_entry("a", "midi", op, i + 1)) |
| 149 | |
| 150 | entries = log.replay_since_checkpoint() |
| 151 | assert len(entries) == 3 |
| 152 | assert all(e["lamport_ts"] > 3 for e in entries) |
| 153 | |
| 154 | |
| 155 | # --------------------------------------------------------------------------- |
| 156 | # to_structured_delta |
| 157 | # --------------------------------------------------------------------------- |
| 158 | |
| 159 | |
| 160 | class TestToStructuredDelta: |
| 161 | def test_produces_correct_domain_ops_filtered_by_domain(self, tmp_path: pathlib.Path) -> None: |
| 162 | log = OpLog(tmp_path, "delta-session") |
| 163 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="C4") |
| 164 | |
| 165 | for i in range(4): |
| 166 | log.append(make_op_entry("a", "midi", op, i + 1)) |
| 167 | # Add one code op that should be filtered out. |
| 168 | code_op = InsertOp(op="insert", address="sym:0", position=0, content_id="d", content_summary="f()") |
| 169 | log.append(make_op_entry("a", "code", code_op, 5)) |
| 170 | |
| 171 | delta = log.to_structured_delta("midi") |
| 172 | assert delta["domain"] == "midi_notes_tracked" or delta["domain"] == "midi" |
| 173 | # Only the 4 music ops should be included. |
| 174 | assert len(delta["ops"]) == 4 |
| 175 | |
| 176 | def test_summary_mentions_insert(self, tmp_path: pathlib.Path) -> None: |
| 177 | log = OpLog(tmp_path, "summary-session") |
| 178 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="C4") |
| 179 | log.append(make_op_entry("a", "midi", op, 1)) |
| 180 | delta = log.to_structured_delta("midi") |
| 181 | assert "insert" in delta["summary"] |
| 182 | |
| 183 | |
| 184 | # --------------------------------------------------------------------------- |
| 185 | # Session listing |
| 186 | # --------------------------------------------------------------------------- |
| 187 | |
| 188 | |
| 189 | class TestListSessions: |
| 190 | def test_lists_all_sessions(self, tmp_path: pathlib.Path) -> None: |
| 191 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 192 | for sid in ["alpha", "beta", "gamma"]: |
| 193 | log = OpLog(tmp_path, sid) |
| 194 | log.append(make_op_entry("a", "midi", op, 1)) |
| 195 | |
| 196 | sessions = list_sessions(tmp_path) |
| 197 | assert "alpha" in sessions |
| 198 | assert "beta" in sessions |
| 199 | assert "gamma" in sessions |
| 200 | |
| 201 | def test_empty_repo_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 202 | assert list_sessions(tmp_path) == [] |