diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9e97239d..50b64d5d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,12 +19,12 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - + - name: Install dependencies run: | python -m pip install --upgrade pip pip install ".[test]" - + - name: Run tests run: | pytest diff --git a/reasoning_gym/games/__init__.py b/reasoning_gym/games/__init__.py index 9174089e..6a6df59f 100644 --- a/reasoning_gym/games/__init__.py +++ b/reasoning_gym/games/__init__.py @@ -25,5 +25,5 @@ "GameOfLifeConfig", "GameOfLifeDataset", "HanoiConfig", - "HanoiDataset" + "HanoiDataset", ] diff --git a/reasoning_gym/games/tower_of_hanoi.py b/reasoning_gym/games/tower_of_hanoi.py index 081b3bc6..3f878b60 100644 --- a/reasoning_gym/games/tower_of_hanoi.py +++ b/reasoning_gym/games/tower_of_hanoi.py @@ -1,13 +1,14 @@ # reasoning_gym/games/tower_of_hanoi.py -from dataclasses import dataclass -from typing import List, Optional, Dict, Tuple import math import random import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple from ..factory import ProceduralDataset, register_dataset + @dataclass class HanoiConfig: """ @@ -21,7 +22,7 @@ class HanoiConfig: - seed: Optional seed for reproducibility. - visualize: Whether to include a visualization of the initial state. """ - + min_disks: int = 3 max_disks: int = 7 min_pegs: int = 3 @@ -29,7 +30,7 @@ class HanoiConfig: size: int = 50 seed: Optional[int] = None visualize: bool = False # New parameter - + def validate(self) -> None: """Validate configuration parameters.""" assert self.min_disks >= 1, "min_disks must be at least 1" @@ -37,12 +38,13 @@ def validate(self) -> None: assert self.min_pegs >= 3, "min_pegs must be at least 3" assert self.max_pegs >= self.min_pegs, "max_pegs must be >= min_pegs" + class MoveGenerator: """ Helper class to generate valid move sequences for Tower of Hanoi using the Frame-Stewart algorithm. It maintains the current state of all pegs to ensure move validity. """ - + def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): self.num_disks = num_disks self.pegs = pegs @@ -54,23 +56,23 @@ def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): self.pegs_state[start].append(disk) self.moves: List[str] = [] self.memo: Dict[Tuple[int, int], int] = {} # Memoization for T(n, k) - + def generate_moves(self) -> List[str]: self.move(n=self.num_disks, source=self.start, target=self.target, auxiliary_pegs=self.auxiliary_pegs) return self.moves - + def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): if n == 0: return if n == 1: self._move_disk(source, target) return - + k = len(auxiliary_pegs) + 2 # Total number of pegs including source and target - + if k < 3: raise ValueError("At least 3 pegs are required.") - + if k == 3: # Classic Tower of Hanoi solution aux = auxiliary_pegs[0] @@ -78,7 +80,7 @@ def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): self._move_disk(source, target) self.move(n - 1, aux, target, [source]) return - + # For k > 3, apply Frame-Stewart algorithm # Find m that minimizes 2*T(m, k) + T(n - m, k - 1) min_moves = math.inf @@ -90,20 +92,20 @@ def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): if total_moves < min_moves: min_moves = total_moves best_m = m - + # Select a temporary peg to hold m disks temp_peg = auxiliary_pegs[0] new_auxiliary = [peg for peg in auxiliary_pegs if peg != temp_peg] - + # Step 1: Move top m disks to temp_peg using all pegs self.move(n=best_m, source=source, target=temp_peg, auxiliary_pegs=auxiliary_pegs[1:] + [target]) - + # Step 2: Move remaining n - m disks to target using k - 1 pegs self.move(n=n - best_m, source=source, target=target, auxiliary_pegs=new_auxiliary) - + # Step 3: Move m disks from temp_peg to target using all pegs self.move(n=best_m, source=temp_peg, target=target, auxiliary_pegs=auxiliary_pegs[1:] + [source]) - + def _move_disk(self, from_peg: int, to_peg: int): if not self.pegs_state[from_peg]: raise ValueError(f"No disks to move from Peg {from_peg}.") @@ -111,7 +113,7 @@ def _move_disk(self, from_peg: int, to_peg: int): self.pegs_state[from_peg].pop() self.pegs_state[to_peg].append(disk) self.moves.append(f"Move disk {disk} from Peg {from_peg} to Peg {to_peg}") - + def _compute_T(self, n: int, k: int) -> int: """ Compute the minimal number of moves (T(n, k)) required to move n disks using k pegs. @@ -122,10 +124,10 @@ def _compute_T(self, n: int, k: int) -> int: if n == 1: return 1 if k == 3: - return 2 ** n - 1 + return 2**n - 1 if (n, k) in self.memo: return self.memo[(n, k)] - + min_moves = math.inf for m in range(1, n): moves = 2 * self._compute_T(m, k) + self._compute_T(n - m, k - 1) @@ -134,12 +136,13 @@ def _compute_T(self, n: int, k: int) -> int: self.memo[(n, k)] = min_moves return min_moves + class HanoiDataset(ProceduralDataset): """ Generates Tower of Hanoi problems with solutions. Supports variable number of pegs using the optimized Frame-Stewart algorithm with Peg State Tracking. """ - + def __init__(self, config: HanoiConfig): super().__init__(config=config, seed=config.seed, size=config.size) self.min_pegs = config.min_pegs @@ -147,11 +150,11 @@ def __init__(self, config: HanoiConfig): self.min_disks = config.min_disks self.max_disks = config.max_disks self.visualize = config.visualize # Initialize the visualize attribute - + def __getitem__(self, idx: int) -> dict: """ Generate a Tower of Hanoi problem instance. - + Returns: dict with: - "question": Text describing the problem setup. @@ -161,33 +164,33 @@ def __getitem__(self, idx: int) -> dict: - "states": (Optional) List of ASCII visualizations after each move. """ rng = random.Random(self.seed + idx if self.seed is not None else None) - + # Randomly select number of disks and pegs within the specified ranges num_disks = rng.randint(self.min_disks, self.max_disks) num_pegs = rng.randint(self.min_pegs, self.max_pegs) - + # Assign unique peg identifiers (e.g., integers starting from 1) pegs = list(range(1, num_pegs + 1)) - + """ #Debug: Print current instance configuration print(f"\n--- Generating Instance {idx} ---") print(f"Number of Disks: {num_disks}") print(f"Number of Pegs: {num_pegs}") print(f"Pegs: {pegs}") """ - + # Randomly select start and target pegs start_peg, target_peg = rng.sample(pegs, 2) - + # Auxiliary pegs are the remaining pegs auxiliary_pegs = [peg for peg in pegs if peg not in (start_peg, target_peg)] - + """ # Debug: Print start, target, and auxiliary pegs print(f"Start Peg: {start_peg}") print(f"Target Peg: {target_peg}") print(f"Auxiliary Pegs: {auxiliary_pegs}") """ - + # Initialize the MoveGenerator and generate moves move_gen = MoveGenerator(num_disks, pegs, start_peg, target_peg) try: @@ -195,24 +198,24 @@ def __getitem__(self, idx: int) -> dict: except ValueError as ve: # print(f"Error during move generation: {ve}") raise ve - + """ # Debug: Print the solution moves print(f"Solution Length: {len(solution)}") print("Solution Moves:") for move_num, move in enumerate(solution, start=1): print(f" Move {move_num}: {move}") """ - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): # Largest disk at the bottom pegs_state[start_peg].append(disk) - + # Generate initial state visualization if requested initial_state_str = None if self.visualize: initial_state_str = self._visualize_state(pegs_state) - + # Apply moves to track state changes states = [] if self.visualize: @@ -224,24 +227,24 @@ def __getitem__(self, idx: int) -> dict: except ValueError as ve: # print(f"Error parsing move: {ve}") raise ve - + # Validate the move if not self._validate_move(pegs_state, move): - #print(f"Invalid move detected: {move}") - #print(f"Current Pegs State: {pegs_state}") + # print(f"Invalid move detected: {move}") + # print(f"Current Pegs State: {pegs_state}") raise ValueError(f"Invalid move detected: {move}") - + # Move the disk pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) - + # Visualize the new state new_state_str = self._visualize_state(pegs_state) states.append(new_state_str) - + # Peg labels peg_labels = {peg: f"Peg {peg}" for peg in pegs} - + question_str = ( f"Solve the Tower of Hanoi problem with {num_disks} disks and {num_pegs} pegs.\n" f"Move all disks from {peg_labels[start_peg]} to {peg_labels[target_peg]} following the rules:\n" @@ -250,7 +253,7 @@ def __getitem__(self, idx: int) -> dict: "- All disks must be on a peg at all times.\n" "Provide the sequence of moves." ) - + result = { "question": question_str, "answer": solution, @@ -263,28 +266,28 @@ def __getitem__(self, idx: int) -> dict: "solution_length": len(solution), }, } - + if self.visualize: result["initial_state"] = initial_state_str result["states"] = states # List of all states including initial and after each move - + return result - + def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: """ Create an ASCII visualization of the current state of the pegs. Adapts to variable number of pegs. - + Args: pegs_state (dict): Dictionary mapping peg numbers to lists of disks. - + Returns: str: ASCII art representing the pegs and disks. """ # Determine the number of levels based on the maximum number of disks on any peg max_height = max(len(disks) for disks in pegs_state.values()) pegs = sorted(pegs_state.keys()) - + visualization = "" for level in range(max_height, 0, -1): for peg in pegs: @@ -295,24 +298,24 @@ def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: disk_str = "[ ]" visualization += disk_str.center(7) # Adjust spacing as needed visualization += "\n" - + # Add the base and peg numbers visualization += "-" * (7 * len(pegs)) + "\n" for peg in pegs: peg_label = f"P{peg}".center(7) visualization += peg_label visualization += "\n" - + return visualization - + def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: """ Validate that a move adheres to the Tower of Hanoi rules. - + Args: pegs_state (dict): Current state of the pegs. move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". - + Returns: bool: True if the move is valid, False otherwise. """ @@ -324,29 +327,29 @@ def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: disk = int(parts[2]) from_peg = int(parts[5]) to_peg = int(parts[8]) - + # Check if the disk to move is the top disk on the from_peg if not pegs_state[from_peg] or pegs_state[from_peg][-1] != disk: # print(f"Disk {disk} is not on top of Peg {from_peg}. Current state: {pegs_state[from_peg]}") return False - + # Check if placing the disk on the to_peg violates size constraints if pegs_state[to_peg] and pegs_state[to_peg][-1] < disk: # print(f"Cannot place disk {disk} on top of smaller disk {pegs_state[to_peg][-1]} on Peg {to_peg}.") return False - + return True except Exception as e: print(f"Error validating move '{move}': {e}") return False - + def _parse_move(self, move: str) -> Tuple[int, int, int]: """ Parse a move string and extract disk number, from peg, and to peg. - + Args: move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". - + Returns: tuple: (disk, from_peg, to_peg) """ @@ -354,11 +357,12 @@ def _parse_move(self, move: str) -> Tuple[int, int, int]: match = re.match(pattern, move) if not match: raise ValueError(f"Unexpected move format: '{move}'") - + disk = int(match.group(1)) from_peg = int(match.group(2)) to_peg = int(match.group(3)) return disk, from_peg, to_peg + # Register the dataset register_dataset("tower_of_hanoi", HanoiDataset, HanoiConfig) diff --git a/tests/test_tower_of_hanoi.py b/tests/test_tower_of_hanoi.py index a3a89023..a4228bc3 100644 --- a/tests/test_tower_of_hanoi.py +++ b/tests/test_tower_of_hanoi.py @@ -1,35 +1,38 @@ """Tests for Tower of Hanoi puzzle generation""" -import pytest import re +import pytest + from reasoning_gym.games.tower_of_hanoi import HanoiConfig, HanoiDataset + def test_toh_config_validation(): """Test that invalid configurations raise appropriate errors.""" # Test negative number of disks with pytest.raises(AssertionError): config = HanoiConfig(min_disks=0) # At least 1 disk required config.validate() - + # Test max_disks less than min_disks with pytest.raises(AssertionError): config = HanoiConfig(min_disks=5, max_disks=3) config.validate() - + # Test min_pegs less than 3 with pytest.raises(AssertionError): config = HanoiConfig(min_pegs=2) config.validate() - + # Test max_pegs less than min_pegs with pytest.raises(AssertionError): config = HanoiConfig(min_pegs=3, max_pegs=2) config.validate() - + # Test invalid move configurations if any (assuming such validations exist) # Add more tests based on the actual validation logic in HanoiConfig + def test_toh_dataset_deterministic(): """Test that dataset generates the same items with the same seed.""" config = HanoiConfig(seed=42, size=10) @@ -39,6 +42,7 @@ def test_toh_dataset_deterministic(): for i in range(len(dataset1)): assert dataset1[i] == dataset2[i], f"Mismatch found in instance {i} with seed 42." + def test_toh_dataset_items(): """Test basic properties of generated items.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) @@ -46,13 +50,13 @@ def test_toh_dataset_items(): for i in range(len(dataset)): item = dataset[i] - + # Check item structure assert isinstance(item, dict), f"Item {i} is not a dictionary." assert "question" in item, f"Item {i} missing 'question' key." assert "answer" in item, f"Item {i} missing 'answer' key." assert "metadata" in item, f"Item {i} missing 'metadata' key." - + # Check metadata metadata = item["metadata"] assert "num_disks" in metadata, f"Item {i} metadata missing 'num_disks'." @@ -61,71 +65,66 @@ def test_toh_dataset_items(): assert "target_peg" in metadata, f"Item {i} metadata missing 'target_peg'." assert "auxiliary_pegs" in metadata, f"Item {i} metadata missing 'auxiliary_pegs'." assert "solution_length" in metadata, f"Item {i} metadata missing 'solution_length'." - + num_disks = metadata["num_disks"] num_pegs = metadata["num_pegs"] start_peg = metadata["start_peg"] target_peg = metadata["target_peg"] auxiliary_pegs = metadata["auxiliary_pegs"] solution_length = metadata["solution_length"] - + # Verify peg counts - assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, ( - f"Item {i} has inconsistent peg counts." - ) - + assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, f"Item {i} has inconsistent peg counts." + # Verify solution_length consistency - assert solution_length == len(item["answer"]), ( - f"Item {i} metadata 'solution_length' does not match actual number of moves." - ) - + assert solution_length == len( + item["answer"] + ), f"Item {i} metadata 'solution_length' does not match actual number of moves." + # Optional: Additional checks like verifying that start and target pegs are distinct assert start_peg != target_peg, f"Item {i} has identical start and target pegs." + def test_toh_move_validity(): """Test that all moves in each problem instance are valid according to Tower of Hanoi rules.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) dataset = HanoiDataset(config) for idx, instance in enumerate(dataset): - num_disks = instance['metadata']['num_disks'] - num_pegs = instance['metadata']['num_pegs'] - start_peg = instance['metadata']['start_peg'] - target_peg = instance['metadata']['target_peg'] - auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] pegs = list(range(1, num_pegs + 1)) - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): pegs_state[start_peg].append(disk) - + # Iterate over each move and validate - for move_num, move in enumerate(instance['answer'], start=1): + for move_num, move in enumerate(instance["answer"], start=1): disk, from_peg, to_peg = parse_move(move) - + # Check that from_peg exists - assert from_peg in pegs, ( - f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." - ) - + assert from_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." + # Check that to_peg exists - assert to_peg in pegs, ( - f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." - ) - + assert to_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." + # Check that from_peg is not empty - assert pegs_state[from_peg], ( - f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." - ) - + assert pegs_state[ + from_peg + ], f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." + # Check that the disk to move is on top of from_peg top_disk = pegs_state[from_peg][-1] assert disk == top_disk, ( f"Move {move_num} in Instance {idx} attempts to move disk {disk} " f"which is not on top of Peg {from_peg} (top disk: {top_disk})." ) - + # Check that moving disk to to_peg does not violate size constraints if pegs_state[to_peg]: top_to_disk = pegs_state[to_peg][-1] @@ -133,53 +132,51 @@ def test_toh_move_validity(): f"Move {move_num} in Instance {idx} attempts to place disk {disk} " f"on top of smaller disk {top_to_disk} on Peg {to_peg}." ) - + # Perform the move pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) + def test_toh_final_state_correct(): """Test that the final state of each problem instance has all disks on the target peg in correct order.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) dataset = HanoiDataset(config) for idx, instance in enumerate(dataset): - num_disks = instance['metadata']['num_disks'] - num_pegs = instance['metadata']['num_pegs'] - start_peg = instance['metadata']['start_peg'] - target_peg = instance['metadata']['target_peg'] - auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] pegs = list(range(1, num_pegs + 1)) - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): pegs_state[start_peg].append(disk) - + # Perform all moves - for move in instance['answer']: + for move in instance["answer"]: disk, from_peg, to_peg = parse_move(move) pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) - + # After all moves, all disks should be on target peg in descending order final_pegs = pegs_state[target_peg] - assert len(final_pegs) == num_disks, ( - f"Instance {idx} does not have all disks on the target Peg {target_peg}." - ) - + assert len(final_pegs) == num_disks, f"Instance {idx} does not have all disks on the target Peg {target_peg}." + # Verify that disks are in correct order on target peg expected_final = list(range(num_disks, 0, -1)) - assert final_pegs == expected_final, ( - f"Instance {idx} has disks on Peg {target_peg} in incorrect order." - ) - + assert final_pegs == expected_final, f"Instance {idx} has disks on Peg {target_peg} in incorrect order." + # Ensure all other pegs are empty for peg in pegs: if peg != target_peg: - assert len(pegs_state[peg]) == 0, ( - f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." - ) + assert ( + len(pegs_state[peg]) == 0 + ), f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." + def test_toh_dataset_iteration(): """Test that iteration respects dataset size and multiple iterations yield the same items.""" @@ -187,16 +184,15 @@ def test_toh_dataset_iteration(): dataset = HanoiDataset(config) # Test dataset size - assert len(dataset) == config.size, ( - f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." - ) - + assert len(dataset) == config.size, f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." + # Collect items items = list(dataset) - + # Test multiple iterations yield the same items assert items == list(dataset), "Multiple iterations over the dataset do not yield the same items." + def parse_move(move_str: str) -> tuple: """Parse a move string and extract disk number, from peg, and to peg. @@ -214,6 +210,7 @@ def parse_move(move_str: str) -> tuple: to_peg = int(match.group(3)) return disk, from_peg, to_peg + def is_valid_final_state(pegs_state: dict, target_peg: int, num_disks: int) -> bool: """Verify that all disks are on the target peg in descending order.