check_attr.py
python
| 1 | """muse plumbing check-attr — query merge-strategy attributes for paths. |
| 2 | |
| 3 | Reads ``.museattributes``, resolves the applicable rules for each supplied |
| 4 | path, and reports the strategy that would be applied per dimension. Useful |
| 5 | for verifying that attribute rules are wired up correctly before a merge, and |
| 6 | for scripting domain-aware merge drivers. |
| 7 | |
| 8 | Output (JSON, default):: |
| 9 | |
| 10 | { |
| 11 | "domain": "midi", |
| 12 | "rules_loaded": 3, |
| 13 | "results": [ |
| 14 | { |
| 15 | "path": "tracks/drums.mid", |
| 16 | "dimension": "*", |
| 17 | "strategy": "ours", |
| 18 | "rule": { |
| 19 | "path_pattern": "drums/*", |
| 20 | "dimension": "*", |
| 21 | "strategy": "ours", |
| 22 | "comment": "Drums always prefer ours.", |
| 23 | "priority": 10, |
| 24 | "source_index": 0 |
| 25 | } |
| 26 | }, |
| 27 | { |
| 28 | "path": "tracks/melody.mid", |
| 29 | "dimension": "*", |
| 30 | "strategy": "auto", |
| 31 | "rule": null |
| 32 | } |
| 33 | ] |
| 34 | } |
| 35 | |
| 36 | Text output (``--format text``):: |
| 37 | |
| 38 | tracks/drums.mid dimension=* strategy=ours (rule 0: drums/*) |
| 39 | tracks/melody.mid dimension=* strategy=auto (no matching rule) |
| 40 | |
| 41 | Plumbing contract |
| 42 | ----------------- |
| 43 | |
| 44 | - Exit 0: attributes resolved and emitted (even when no rules match). |
| 45 | - Exit 1: bad ``--format`` value; missing path arguments. |
| 46 | - Exit 3: I/O or TOML parse error reading ``.museattributes``. |
| 47 | """ |
| 48 | |
| 49 | from __future__ import annotations |
| 50 | |
| 51 | import argparse |
| 52 | import fnmatch |
| 53 | import json |
| 54 | import logging |
| 55 | import sys |
| 56 | from typing import TypedDict |
| 57 | |
| 58 | from muse.core.attributes import AttributeRule, load_attributes, resolve_strategy |
| 59 | from muse.core.errors import ExitCode |
| 60 | from muse.core.repo import require_repo |
| 61 | from muse.plugins.registry import read_domain |
| 62 | |
| 63 | logger = logging.getLogger(__name__) |
| 64 | |
| 65 | _FORMAT_CHOICES = ("json", "text") |
| 66 | |
| 67 | |
| 68 | class _RuleDict(TypedDict): |
| 69 | path_pattern: str |
| 70 | dimension: str |
| 71 | strategy: str |
| 72 | comment: str |
| 73 | priority: int |
| 74 | source_index: int |
| 75 | |
| 76 | |
| 77 | class _PathResult(TypedDict): |
| 78 | path: str |
| 79 | dimension: str |
| 80 | strategy: str |
| 81 | rule: _RuleDict | None |
| 82 | |
| 83 | |
| 84 | def _find_matching_rule( |
| 85 | rules: list[AttributeRule], path: str, dimension: str |
| 86 | ) -> AttributeRule | None: |
| 87 | """Return the first rule that matches *path* and *dimension*, or ``None``.""" |
| 88 | for rule in rules: |
| 89 | path_match = fnmatch.fnmatch(path, rule.path_pattern) |
| 90 | dim_match = ( |
| 91 | rule.dimension == "*" |
| 92 | or rule.dimension == dimension |
| 93 | or dimension == "*" |
| 94 | ) |
| 95 | if path_match and dim_match: |
| 96 | return rule |
| 97 | return None |
| 98 | |
| 99 | |
| 100 | def _rule_to_dict(rule: AttributeRule) -> _RuleDict: |
| 101 | return { |
| 102 | "path_pattern": rule.path_pattern, |
| 103 | "dimension": rule.dimension, |
| 104 | "strategy": rule.strategy, |
| 105 | "comment": rule.comment, |
| 106 | "priority": rule.priority, |
| 107 | "source_index": rule.source_index, |
| 108 | } |
| 109 | |
| 110 | |
| 111 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 112 | """Register the check-attr subcommand.""" |
| 113 | parser = subparsers.add_parser( |
| 114 | "check-attr", |
| 115 | help="Query merge-strategy attributes for workspace paths.", |
| 116 | description=__doc__, |
| 117 | ) |
| 118 | parser.add_argument( |
| 119 | "paths", |
| 120 | nargs="+", |
| 121 | help="Workspace-relative paths to check.", |
| 122 | ) |
| 123 | parser.add_argument( |
| 124 | "--dimension", "-d", |
| 125 | default="*", |
| 126 | dest="dimension", |
| 127 | metavar="DIMENSION", |
| 128 | help="Domain dimension to query (e.g. 'notes', 'pitch_bend'). " |
| 129 | "Use '*' to match any dimension. (default: *)", |
| 130 | ) |
| 131 | parser.add_argument( |
| 132 | "--format", "-f", |
| 133 | dest="fmt", |
| 134 | default="json", |
| 135 | metavar="FORMAT", |
| 136 | help="Output format: json or text. (default: json)", |
| 137 | ) |
| 138 | parser.add_argument( |
| 139 | "--all-rules", "-A", |
| 140 | action="store_true", |
| 141 | dest="all_rules", |
| 142 | help="For each path, list all matching rules (not just the first).", |
| 143 | ) |
| 144 | parser.set_defaults(func=run) |
| 145 | |
| 146 | |
| 147 | def run(args: argparse.Namespace) -> None: |
| 148 | """Query merge-strategy attributes for one or more paths. |
| 149 | |
| 150 | Reads ``.museattributes`` from the repository root and reports the |
| 151 | strategy that would be applied to each path for the given dimension. |
| 152 | """ |
| 153 | fmt: str = args.fmt |
| 154 | paths: list[str] = args.paths |
| 155 | dimension: str = args.dimension |
| 156 | all_rules: bool = args.all_rules |
| 157 | |
| 158 | if fmt not in _FORMAT_CHOICES: |
| 159 | print( |
| 160 | json.dumps( |
| 161 | {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"} |
| 162 | ) |
| 163 | ) |
| 164 | raise SystemExit(ExitCode.USER_ERROR) |
| 165 | |
| 166 | if not paths: |
| 167 | print(json.dumps({"error": "At least one path argument is required."})) |
| 168 | raise SystemExit(ExitCode.USER_ERROR) |
| 169 | |
| 170 | root = require_repo() |
| 171 | domain = read_domain(root) |
| 172 | |
| 173 | try: |
| 174 | rules = load_attributes(root, domain=domain) |
| 175 | except ValueError as exc: |
| 176 | print(json.dumps({"error": str(exc)})) |
| 177 | raise SystemExit(ExitCode.INTERNAL_ERROR) |
| 178 | |
| 179 | if all_rules: |
| 180 | # Return every matching rule per path. |
| 181 | per_path: dict[str, list[_RuleDict]] = {} |
| 182 | for path in paths: |
| 183 | matching: list[_RuleDict] = [] |
| 184 | for rule in rules: |
| 185 | path_match = fnmatch.fnmatch(path, rule.path_pattern) |
| 186 | dim_match = ( |
| 187 | rule.dimension == "*" |
| 188 | or rule.dimension == dimension |
| 189 | or dimension == "*" |
| 190 | ) |
| 191 | if path_match and dim_match: |
| 192 | matching.append(_rule_to_dict(rule)) |
| 193 | per_path[path] = matching |
| 194 | |
| 195 | if fmt == "text": |
| 196 | for path, matched_rules in per_path.items(): |
| 197 | if not matched_rules: |
| 198 | print(f"{path} (no matching rules)") |
| 199 | else: |
| 200 | for rd in matched_rules: |
| 201 | print( |
| 202 | f"{path} dimension={rd['dimension']} " |
| 203 | f"strategy={rd['strategy']} (rule {rd['source_index']}: " |
| 204 | f"{rd['path_pattern']})" |
| 205 | ) |
| 206 | return |
| 207 | |
| 208 | print( |
| 209 | json.dumps( |
| 210 | { |
| 211 | "domain": domain, |
| 212 | "rules_loaded": len(rules), |
| 213 | "dimension": dimension, |
| 214 | "results": [ |
| 215 | {"path": path, "matching_rules": per_path[path]} |
| 216 | for path in paths |
| 217 | ], |
| 218 | } |
| 219 | ) |
| 220 | ) |
| 221 | return |
| 222 | |
| 223 | # Default: first-match winner per path. |
| 224 | results: list[_PathResult] = [] |
| 225 | for path in paths: |
| 226 | strategy = resolve_strategy(rules, path, dimension) |
| 227 | matched_rule = _find_matching_rule(rules, path, dimension) |
| 228 | results.append( |
| 229 | { |
| 230 | "path": path, |
| 231 | "dimension": dimension, |
| 232 | "strategy": strategy, |
| 233 | "rule": _rule_to_dict(matched_rule) if matched_rule else None, |
| 234 | } |
| 235 | ) |
| 236 | |
| 237 | if fmt == "text": |
| 238 | for res in results: |
| 239 | rule_entry: _RuleDict | None = res["rule"] |
| 240 | if rule_entry is not None: |
| 241 | rule_info = f"(rule {rule_entry['source_index']}: {rule_entry['path_pattern']})" |
| 242 | else: |
| 243 | rule_info = "(no matching rule)" |
| 244 | print( |
| 245 | f"{res['path']} dimension={res['dimension']} " |
| 246 | f"strategy={res['strategy']} {rule_info}" |
| 247 | ) |
| 248 | return |
| 249 | |
| 250 | print( |
| 251 | json.dumps( |
| 252 | { |
| 253 | "domain": domain, |
| 254 | "rules_loaded": len(rules), |
| 255 | "dimension": dimension, |
| 256 | "results": [dict(r) for r in results], |
| 257 | } |
| 258 | ) |
| 259 | ) |