From 4bf99d3e0bd1f1eee98ce0388828153eb7afdc03 Mon Sep 17 00:00:00 2001 From: Joe Norton <16323+joenorton@users.noreply.github.com> Date: Thu, 30 Jan 2025 23:16:06 -0800 Subject: [PATCH 1/3] adds Tower of Hanoi creates game file & test file, modifies games init to add toh --- reasoning_gym/games/__init__.py | 3 + reasoning_gym/games/tower_of_hanoi.py | 364 ++++++++++++++++++++++++++ tests/test_tower_of_hanoi.py | 231 ++++++++++++++++ 3 files changed, 598 insertions(+) create mode 100644 reasoning_gym/games/tower_of_hanoi.py create mode 100644 tests/test_tower_of_hanoi.py diff --git a/reasoning_gym/games/__init__.py b/reasoning_gym/games/__init__.py index a801c6e4..9174089e 100644 --- a/reasoning_gym/games/__init__.py +++ b/reasoning_gym/games/__init__.py @@ -11,6 +11,7 @@ from .maze import MazeConfig, MazeDataset from .mini_sudoku import MiniSudokuConfig, MiniSudokuDataset from .sudoku import SudokuConfig, SudokuDataset +from .tower_of_hanoi import HanoiConfig, HanoiDataset __all__ = [ "CountdownConfig", @@ -23,4 +24,6 @@ "MazeDataset", "GameOfLifeConfig", "GameOfLifeDataset", + "HanoiConfig", + "HanoiDataset" ] diff --git a/reasoning_gym/games/tower_of_hanoi.py b/reasoning_gym/games/tower_of_hanoi.py new file mode 100644 index 00000000..081b3bc6 --- /dev/null +++ b/reasoning_gym/games/tower_of_hanoi.py @@ -0,0 +1,364 @@ +# reasoning_gym/games/tower_of_hanoi.py + +from dataclasses import dataclass +from typing import List, Optional, Dict, Tuple +import math +import random +import re + +from ..factory import ProceduralDataset, register_dataset + +@dataclass +class HanoiConfig: + """ + Configuration for the Tower of Hanoi task. + + - min_disks: Minimum number of disks in the puzzle. + - max_disks: Maximum number of disks in the puzzle. + - min_pegs: Minimum number of pegs (minimum 3). + - max_pegs: Maximum number of pegs. + - size: Number of problem instances in the dataset. + - seed: Optional seed for reproducibility. + - visualize: Whether to include a visualization of the initial state. + """ + + min_disks: int = 3 + max_disks: int = 7 + min_pegs: int = 3 + max_pegs: int = 4 + size: int = 50 + seed: Optional[int] = None + visualize: bool = False # New parameter + + def validate(self) -> None: + """Validate configuration parameters.""" + assert self.min_disks >= 1, "min_disks must be at least 1" + assert self.max_disks >= self.min_disks, "max_disks must be >= min_disks" + assert self.min_pegs >= 3, "min_pegs must be at least 3" + assert self.max_pegs >= self.min_pegs, "max_pegs must be >= min_pegs" + +class MoveGenerator: + """ + Helper class to generate valid move sequences for Tower of Hanoi using the Frame-Stewart algorithm. + It maintains the current state of all pegs to ensure move validity. + """ + + def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): + self.num_disks = num_disks + self.pegs = pegs + self.start = start + self.target = target + self.auxiliary_pegs = [peg for peg in pegs if peg not in (start, target)] + self.pegs_state: Dict[int, List[int]] = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): # Largest disk at the bottom + self.pegs_state[start].append(disk) + self.moves: List[str] = [] + self.memo: Dict[Tuple[int, int], int] = {} # Memoization for T(n, k) + + def generate_moves(self) -> List[str]: + self.move(n=self.num_disks, source=self.start, target=self.target, auxiliary_pegs=self.auxiliary_pegs) + return self.moves + + def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): + if n == 0: + return + if n == 1: + self._move_disk(source, target) + return + + k = len(auxiliary_pegs) + 2 # Total number of pegs including source and target + + if k < 3: + raise ValueError("At least 3 pegs are required.") + + if k == 3: + # Classic Tower of Hanoi solution + aux = auxiliary_pegs[0] + self.move(n - 1, source, aux, [target]) + self._move_disk(source, target) + self.move(n - 1, aux, target, [source]) + return + + # For k > 3, apply Frame-Stewart algorithm + # Find m that minimizes 2*T(m, k) + T(n - m, k - 1) + min_moves = math.inf + best_m = 1 + for m in range(1, n): + moves_m = self._compute_T(m, k) + moves_n_minus_m = self._compute_T(n - m, k - 1) + total_moves = 2 * moves_m + moves_n_minus_m + if total_moves < min_moves: + min_moves = total_moves + best_m = m + + # Select a temporary peg to hold m disks + temp_peg = auxiliary_pegs[0] + new_auxiliary = [peg for peg in auxiliary_pegs if peg != temp_peg] + + # Step 1: Move top m disks to temp_peg using all pegs + self.move(n=best_m, source=source, target=temp_peg, auxiliary_pegs=auxiliary_pegs[1:] + [target]) + + # Step 2: Move remaining n - m disks to target using k - 1 pegs + self.move(n=n - best_m, source=source, target=target, auxiliary_pegs=new_auxiliary) + + # Step 3: Move m disks from temp_peg to target using all pegs + self.move(n=best_m, source=temp_peg, target=target, auxiliary_pegs=auxiliary_pegs[1:] + [source]) + + def _move_disk(self, from_peg: int, to_peg: int): + if not self.pegs_state[from_peg]: + raise ValueError(f"No disks to move from Peg {from_peg}.") + disk = self.pegs_state[from_peg][-1] + self.pegs_state[from_peg].pop() + self.pegs_state[to_peg].append(disk) + self.moves.append(f"Move disk {disk} from Peg {from_peg} to Peg {to_peg}") + + def _compute_T(self, n: int, k: int) -> int: + """ + Compute the minimal number of moves (T(n, k)) required to move n disks using k pegs. + Utilizes memoization to store previously computed results. + """ + if n == 0: + return 0 + if n == 1: + return 1 + if k == 3: + return 2 ** n - 1 + if (n, k) in self.memo: + return self.memo[(n, k)] + + min_moves = math.inf + for m in range(1, n): + moves = 2 * self._compute_T(m, k) + self._compute_T(n - m, k - 1) + if moves < min_moves: + min_moves = moves + self.memo[(n, k)] = min_moves + return min_moves + +class HanoiDataset(ProceduralDataset): + """ + Generates Tower of Hanoi problems with solutions. + Supports variable number of pegs using the optimized Frame-Stewart algorithm with Peg State Tracking. + """ + + def __init__(self, config: HanoiConfig): + super().__init__(config=config, seed=config.seed, size=config.size) + self.min_pegs = config.min_pegs + self.max_pegs = config.max_pegs + self.min_disks = config.min_disks + self.max_disks = config.max_disks + self.visualize = config.visualize # Initialize the visualize attribute + + def __getitem__(self, idx: int) -> dict: + """ + Generate a Tower of Hanoi problem instance. + + Returns: + dict with: + - "question": Text describing the problem setup. + - "answer": List of moves to solve the puzzle. + - "metadata": Configuration and solution details. + - "initial_state": (Optional) ASCII visualization of the initial pegs. + - "states": (Optional) List of ASCII visualizations after each move. + """ + rng = random.Random(self.seed + idx if self.seed is not None else None) + + # Randomly select number of disks and pegs within the specified ranges + num_disks = rng.randint(self.min_disks, self.max_disks) + num_pegs = rng.randint(self.min_pegs, self.max_pegs) + + # Assign unique peg identifiers (e.g., integers starting from 1) + pegs = list(range(1, num_pegs + 1)) + + """ #Debug: Print current instance configuration + print(f"\n--- Generating Instance {idx} ---") + print(f"Number of Disks: {num_disks}") + print(f"Number of Pegs: {num_pegs}") + print(f"Pegs: {pegs}") + """ + + # Randomly select start and target pegs + start_peg, target_peg = rng.sample(pegs, 2) + + # Auxiliary pegs are the remaining pegs + auxiliary_pegs = [peg for peg in pegs if peg not in (start_peg, target_peg)] + + """ # Debug: Print start, target, and auxiliary pegs + print(f"Start Peg: {start_peg}") + print(f"Target Peg: {target_peg}") + print(f"Auxiliary Pegs: {auxiliary_pegs}") + """ + + # Initialize the MoveGenerator and generate moves + move_gen = MoveGenerator(num_disks, pegs, start_peg, target_peg) + try: + solution = move_gen.generate_moves() + except ValueError as ve: + # print(f"Error during move generation: {ve}") + raise ve + + """ # Debug: Print the solution moves + print(f"Solution Length: {len(solution)}") + print("Solution Moves:") + for move_num, move in enumerate(solution, start=1): + print(f" Move {move_num}: {move}") + """ + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): # Largest disk at the bottom + pegs_state[start_peg].append(disk) + + # Generate initial state visualization if requested + initial_state_str = None + if self.visualize: + initial_state_str = self._visualize_state(pegs_state) + + # Apply moves to track state changes + states = [] + if self.visualize: + states.append(initial_state_str) # Initial state + for move in solution: + # Parse the move string using regex + try: + disk, from_peg, to_peg = self._parse_move(move) + except ValueError as ve: + # print(f"Error parsing move: {ve}") + raise ve + + # Validate the move + if not self._validate_move(pegs_state, move): + #print(f"Invalid move detected: {move}") + #print(f"Current Pegs State: {pegs_state}") + raise ValueError(f"Invalid move detected: {move}") + + # Move the disk + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + + # Visualize the new state + new_state_str = self._visualize_state(pegs_state) + states.append(new_state_str) + + # Peg labels + peg_labels = {peg: f"Peg {peg}" for peg in pegs} + + question_str = ( + f"Solve the Tower of Hanoi problem with {num_disks} disks and {num_pegs} pegs.\n" + f"Move all disks from {peg_labels[start_peg]} to {peg_labels[target_peg]} following the rules:\n" + "- Only one disk can be moved at a time.\n" + "- A larger disk cannot be placed on top of a smaller disk.\n" + "- All disks must be on a peg at all times.\n" + "Provide the sequence of moves." + ) + + result = { + "question": question_str, + "answer": solution, + "metadata": { + "num_disks": num_disks, + "num_pegs": num_pegs, + "start_peg": start_peg, + "target_peg": target_peg, + "auxiliary_pegs": auxiliary_pegs, + "solution_length": len(solution), + }, + } + + if self.visualize: + result["initial_state"] = initial_state_str + result["states"] = states # List of all states including initial and after each move + + return result + + def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: + """ + Create an ASCII visualization of the current state of the pegs. + Adapts to variable number of pegs. + + Args: + pegs_state (dict): Dictionary mapping peg numbers to lists of disks. + + Returns: + str: ASCII art representing the pegs and disks. + """ + # Determine the number of levels based on the maximum number of disks on any peg + max_height = max(len(disks) for disks in pegs_state.values()) + pegs = sorted(pegs_state.keys()) + + visualization = "" + for level in range(max_height, 0, -1): + for peg in pegs: + if len(pegs_state[peg]) >= level: + disk_size = pegs_state[peg][level - 1] + disk_str = f"[{'*' * disk_size}]" + else: + disk_str = "[ ]" + visualization += disk_str.center(7) # Adjust spacing as needed + visualization += "\n" + + # Add the base and peg numbers + visualization += "-" * (7 * len(pegs)) + "\n" + for peg in pegs: + peg_label = f"P{peg}".center(7) + visualization += peg_label + visualization += "\n" + + return visualization + + def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: + """ + Validate that a move adheres to the Tower of Hanoi rules. + + Args: + pegs_state (dict): Current state of the pegs. + move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + bool: True if the move is valid, False otherwise. + """ + try: + parts = move.split() + if len(parts) != 9: + # print(f"Unexpected move format: '{move}'") + return False + disk = int(parts[2]) + from_peg = int(parts[5]) + to_peg = int(parts[8]) + + # Check if the disk to move is the top disk on the from_peg + if not pegs_state[from_peg] or pegs_state[from_peg][-1] != disk: + # print(f"Disk {disk} is not on top of Peg {from_peg}. Current state: {pegs_state[from_peg]}") + return False + + # Check if placing the disk on the to_peg violates size constraints + if pegs_state[to_peg] and pegs_state[to_peg][-1] < disk: + # print(f"Cannot place disk {disk} on top of smaller disk {pegs_state[to_peg][-1]} on Peg {to_peg}.") + return False + + return True + except Exception as e: + print(f"Error validating move '{move}': {e}") + return False + + def _parse_move(self, move: str) -> Tuple[int, int, int]: + """ + Parse a move string and extract disk number, from peg, and to peg. + + Args: + move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + tuple: (disk, from_peg, to_peg) + """ + pattern = r"Move disk (\d+) from Peg (\d+) to Peg (\d+)" + match = re.match(pattern, move) + if not match: + raise ValueError(f"Unexpected move format: '{move}'") + + disk = int(match.group(1)) + from_peg = int(match.group(2)) + to_peg = int(match.group(3)) + return disk, from_peg, to_peg + +# Register the dataset +register_dataset("tower_of_hanoi", HanoiDataset, HanoiConfig) diff --git a/tests/test_tower_of_hanoi.py b/tests/test_tower_of_hanoi.py new file mode 100644 index 00000000..a3a89023 --- /dev/null +++ b/tests/test_tower_of_hanoi.py @@ -0,0 +1,231 @@ +"""Tests for Tower of Hanoi puzzle generation""" + +import pytest +import re + +from reasoning_gym.games.tower_of_hanoi import HanoiConfig, HanoiDataset + +def test_toh_config_validation(): + """Test that invalid configurations raise appropriate errors.""" + # Test negative number of disks + with pytest.raises(AssertionError): + config = HanoiConfig(min_disks=0) # At least 1 disk required + config.validate() + + # Test max_disks less than min_disks + with pytest.raises(AssertionError): + config = HanoiConfig(min_disks=5, max_disks=3) + config.validate() + + # Test min_pegs less than 3 + with pytest.raises(AssertionError): + config = HanoiConfig(min_pegs=2) + config.validate() + + # Test max_pegs less than min_pegs + with pytest.raises(AssertionError): + config = HanoiConfig(min_pegs=3, max_pegs=2) + config.validate() + + # Test invalid move configurations if any (assuming such validations exist) + # Add more tests based on the actual validation logic in HanoiConfig + +def test_toh_dataset_deterministic(): + """Test that dataset generates the same items with the same seed.""" + config = HanoiConfig(seed=42, size=10) + dataset1 = HanoiDataset(config) + dataset2 = HanoiDataset(config) + + for i in range(len(dataset1)): + assert dataset1[i] == dataset2[i], f"Mismatch found in instance {i} with seed 42." + +def test_toh_dataset_items(): + """Test basic properties of generated items.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for i in range(len(dataset)): + item = dataset[i] + + # Check item structure + assert isinstance(item, dict), f"Item {i} is not a dictionary." + assert "question" in item, f"Item {i} missing 'question' key." + assert "answer" in item, f"Item {i} missing 'answer' key." + assert "metadata" in item, f"Item {i} missing 'metadata' key." + + # Check metadata + metadata = item["metadata"] + assert "num_disks" in metadata, f"Item {i} metadata missing 'num_disks'." + assert "num_pegs" in metadata, f"Item {i} metadata missing 'num_pegs'." + assert "start_peg" in metadata, f"Item {i} metadata missing 'start_peg'." + assert "target_peg" in metadata, f"Item {i} metadata missing 'target_peg'." + assert "auxiliary_pegs" in metadata, f"Item {i} metadata missing 'auxiliary_pegs'." + assert "solution_length" in metadata, f"Item {i} metadata missing 'solution_length'." + + num_disks = metadata["num_disks"] + num_pegs = metadata["num_pegs"] + start_peg = metadata["start_peg"] + target_peg = metadata["target_peg"] + auxiliary_pegs = metadata["auxiliary_pegs"] + solution_length = metadata["solution_length"] + + # Verify peg counts + assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, ( + f"Item {i} has inconsistent peg counts." + ) + + # Verify solution_length consistency + assert solution_length == len(item["answer"]), ( + f"Item {i} metadata 'solution_length' does not match actual number of moves." + ) + + # Optional: Additional checks like verifying that start and target pegs are distinct + assert start_peg != target_peg, f"Item {i} has identical start and target pegs." + +def test_toh_move_validity(): + """Test that all moves in each problem instance are valid according to Tower of Hanoi rules.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for idx, instance in enumerate(dataset): + num_disks = instance['metadata']['num_disks'] + num_pegs = instance['metadata']['num_pegs'] + start_peg = instance['metadata']['start_peg'] + target_peg = instance['metadata']['target_peg'] + auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + pegs = list(range(1, num_pegs + 1)) + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): + pegs_state[start_peg].append(disk) + + # Iterate over each move and validate + for move_num, move in enumerate(instance['answer'], start=1): + disk, from_peg, to_peg = parse_move(move) + + # Check that from_peg exists + assert from_peg in pegs, ( + f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." + ) + + # Check that to_peg exists + assert to_peg in pegs, ( + f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." + ) + + # Check that from_peg is not empty + assert pegs_state[from_peg], ( + f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." + ) + + # Check that the disk to move is on top of from_peg + top_disk = pegs_state[from_peg][-1] + assert disk == top_disk, ( + f"Move {move_num} in Instance {idx} attempts to move disk {disk} " + f"which is not on top of Peg {from_peg} (top disk: {top_disk})." + ) + + # Check that moving disk to to_peg does not violate size constraints + if pegs_state[to_peg]: + top_to_disk = pegs_state[to_peg][-1] + assert top_to_disk > disk, ( + f"Move {move_num} in Instance {idx} attempts to place disk {disk} " + f"on top of smaller disk {top_to_disk} on Peg {to_peg}." + ) + + # Perform the move + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + +def test_toh_final_state_correct(): + """Test that the final state of each problem instance has all disks on the target peg in correct order.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for idx, instance in enumerate(dataset): + num_disks = instance['metadata']['num_disks'] + num_pegs = instance['metadata']['num_pegs'] + start_peg = instance['metadata']['start_peg'] + target_peg = instance['metadata']['target_peg'] + auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + pegs = list(range(1, num_pegs + 1)) + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): + pegs_state[start_peg].append(disk) + + # Perform all moves + for move in instance['answer']: + disk, from_peg, to_peg = parse_move(move) + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + + # After all moves, all disks should be on target peg in descending order + final_pegs = pegs_state[target_peg] + assert len(final_pegs) == num_disks, ( + f"Instance {idx} does not have all disks on the target Peg {target_peg}." + ) + + # Verify that disks are in correct order on target peg + expected_final = list(range(num_disks, 0, -1)) + assert final_pegs == expected_final, ( + f"Instance {idx} has disks on Peg {target_peg} in incorrect order." + ) + + # Ensure all other pegs are empty + for peg in pegs: + if peg != target_peg: + assert len(pegs_state[peg]) == 0, ( + f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." + ) + +def test_toh_dataset_iteration(): + """Test that iteration respects dataset size and multiple iterations yield the same items.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=5, seed=42) + dataset = HanoiDataset(config) + + # Test dataset size + assert len(dataset) == config.size, ( + f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." + ) + + # Collect items + items = list(dataset) + + # Test multiple iterations yield the same items + assert items == list(dataset), "Multiple iterations over the dataset do not yield the same items." + +def parse_move(move_str: str) -> tuple: + """Parse a move string and extract disk number, from peg, and to peg. + + Args: + move_str (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + tuple: (disk, from_peg, to_peg) + """ + pattern = r"Move disk (\d+) from Peg (\d+) to Peg (\d+)" + match = re.match(pattern, move_str) + assert match is not None, f"Move string '{move_str}' does not match the expected format." + disk = int(match.group(1)) + from_peg = int(match.group(2)) + to_peg = int(match.group(3)) + return disk, from_peg, to_peg + +def is_valid_final_state(pegs_state: dict, target_peg: int, num_disks: int) -> bool: + """Verify that all disks are on the target peg in descending order. + + Args: + pegs_state (dict): Current state of the pegs. + target_peg (int): The target peg number. + num_disks (int): Total number of disks. + + Returns: + bool: True if valid, False otherwise. + """ + target_stack = pegs_state[target_peg] + if len(target_stack) != num_disks: + return False + return target_stack == list(range(num_disks, 0, -1)) From b61bb23620da967e8e76d35e324d055e5f859d83 Mon Sep 17 00:00:00 2001 From: Joe Norton <16323+joenorton@users.noreply.github.com> Date: Fri, 31 Jan 2025 00:05:33 -0800 Subject: [PATCH 2/3] linter --- .github/workflows/tests.yml | 4 +- reasoning_gym/games/__init__.py | 2 +- reasoning_gym/games/tower_of_hanoi.py | 122 +++++++++++++------------ tests/test_tower_of_hanoi.py | 127 +++++++++++++------------- 4 files changed, 128 insertions(+), 127 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9e97239d..50b64d5d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,12 +19,12 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - + - name: Install dependencies run: | python -m pip install --upgrade pip pip install ".[test]" - + - name: Run tests run: | pytest diff --git a/reasoning_gym/games/__init__.py b/reasoning_gym/games/__init__.py index 9174089e..6a6df59f 100644 --- a/reasoning_gym/games/__init__.py +++ b/reasoning_gym/games/__init__.py @@ -25,5 +25,5 @@ "GameOfLifeConfig", "GameOfLifeDataset", "HanoiConfig", - "HanoiDataset" + "HanoiDataset", ] diff --git a/reasoning_gym/games/tower_of_hanoi.py b/reasoning_gym/games/tower_of_hanoi.py index 081b3bc6..3f878b60 100644 --- a/reasoning_gym/games/tower_of_hanoi.py +++ b/reasoning_gym/games/tower_of_hanoi.py @@ -1,13 +1,14 @@ # reasoning_gym/games/tower_of_hanoi.py -from dataclasses import dataclass -from typing import List, Optional, Dict, Tuple import math import random import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple from ..factory import ProceduralDataset, register_dataset + @dataclass class HanoiConfig: """ @@ -21,7 +22,7 @@ class HanoiConfig: - seed: Optional seed for reproducibility. - visualize: Whether to include a visualization of the initial state. """ - + min_disks: int = 3 max_disks: int = 7 min_pegs: int = 3 @@ -29,7 +30,7 @@ class HanoiConfig: size: int = 50 seed: Optional[int] = None visualize: bool = False # New parameter - + def validate(self) -> None: """Validate configuration parameters.""" assert self.min_disks >= 1, "min_disks must be at least 1" @@ -37,12 +38,13 @@ def validate(self) -> None: assert self.min_pegs >= 3, "min_pegs must be at least 3" assert self.max_pegs >= self.min_pegs, "max_pegs must be >= min_pegs" + class MoveGenerator: """ Helper class to generate valid move sequences for Tower of Hanoi using the Frame-Stewart algorithm. It maintains the current state of all pegs to ensure move validity. """ - + def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): self.num_disks = num_disks self.pegs = pegs @@ -54,23 +56,23 @@ def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): self.pegs_state[start].append(disk) self.moves: List[str] = [] self.memo: Dict[Tuple[int, int], int] = {} # Memoization for T(n, k) - + def generate_moves(self) -> List[str]: self.move(n=self.num_disks, source=self.start, target=self.target, auxiliary_pegs=self.auxiliary_pegs) return self.moves - + def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): if n == 0: return if n == 1: self._move_disk(source, target) return - + k = len(auxiliary_pegs) + 2 # Total number of pegs including source and target - + if k < 3: raise ValueError("At least 3 pegs are required.") - + if k == 3: # Classic Tower of Hanoi solution aux = auxiliary_pegs[0] @@ -78,7 +80,7 @@ def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): self._move_disk(source, target) self.move(n - 1, aux, target, [source]) return - + # For k > 3, apply Frame-Stewart algorithm # Find m that minimizes 2*T(m, k) + T(n - m, k - 1) min_moves = math.inf @@ -90,20 +92,20 @@ def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): if total_moves < min_moves: min_moves = total_moves best_m = m - + # Select a temporary peg to hold m disks temp_peg = auxiliary_pegs[0] new_auxiliary = [peg for peg in auxiliary_pegs if peg != temp_peg] - + # Step 1: Move top m disks to temp_peg using all pegs self.move(n=best_m, source=source, target=temp_peg, auxiliary_pegs=auxiliary_pegs[1:] + [target]) - + # Step 2: Move remaining n - m disks to target using k - 1 pegs self.move(n=n - best_m, source=source, target=target, auxiliary_pegs=new_auxiliary) - + # Step 3: Move m disks from temp_peg to target using all pegs self.move(n=best_m, source=temp_peg, target=target, auxiliary_pegs=auxiliary_pegs[1:] + [source]) - + def _move_disk(self, from_peg: int, to_peg: int): if not self.pegs_state[from_peg]: raise ValueError(f"No disks to move from Peg {from_peg}.") @@ -111,7 +113,7 @@ def _move_disk(self, from_peg: int, to_peg: int): self.pegs_state[from_peg].pop() self.pegs_state[to_peg].append(disk) self.moves.append(f"Move disk {disk} from Peg {from_peg} to Peg {to_peg}") - + def _compute_T(self, n: int, k: int) -> int: """ Compute the minimal number of moves (T(n, k)) required to move n disks using k pegs. @@ -122,10 +124,10 @@ def _compute_T(self, n: int, k: int) -> int: if n == 1: return 1 if k == 3: - return 2 ** n - 1 + return 2**n - 1 if (n, k) in self.memo: return self.memo[(n, k)] - + min_moves = math.inf for m in range(1, n): moves = 2 * self._compute_T(m, k) + self._compute_T(n - m, k - 1) @@ -134,12 +136,13 @@ def _compute_T(self, n: int, k: int) -> int: self.memo[(n, k)] = min_moves return min_moves + class HanoiDataset(ProceduralDataset): """ Generates Tower of Hanoi problems with solutions. Supports variable number of pegs using the optimized Frame-Stewart algorithm with Peg State Tracking. """ - + def __init__(self, config: HanoiConfig): super().__init__(config=config, seed=config.seed, size=config.size) self.min_pegs = config.min_pegs @@ -147,11 +150,11 @@ def __init__(self, config: HanoiConfig): self.min_disks = config.min_disks self.max_disks = config.max_disks self.visualize = config.visualize # Initialize the visualize attribute - + def __getitem__(self, idx: int) -> dict: """ Generate a Tower of Hanoi problem instance. - + Returns: dict with: - "question": Text describing the problem setup. @@ -161,33 +164,33 @@ def __getitem__(self, idx: int) -> dict: - "states": (Optional) List of ASCII visualizations after each move. """ rng = random.Random(self.seed + idx if self.seed is not None else None) - + # Randomly select number of disks and pegs within the specified ranges num_disks = rng.randint(self.min_disks, self.max_disks) num_pegs = rng.randint(self.min_pegs, self.max_pegs) - + # Assign unique peg identifiers (e.g., integers starting from 1) pegs = list(range(1, num_pegs + 1)) - + """ #Debug: Print current instance configuration print(f"\n--- Generating Instance {idx} ---") print(f"Number of Disks: {num_disks}") print(f"Number of Pegs: {num_pegs}") print(f"Pegs: {pegs}") """ - + # Randomly select start and target pegs start_peg, target_peg = rng.sample(pegs, 2) - + # Auxiliary pegs are the remaining pegs auxiliary_pegs = [peg for peg in pegs if peg not in (start_peg, target_peg)] - + """ # Debug: Print start, target, and auxiliary pegs print(f"Start Peg: {start_peg}") print(f"Target Peg: {target_peg}") print(f"Auxiliary Pegs: {auxiliary_pegs}") """ - + # Initialize the MoveGenerator and generate moves move_gen = MoveGenerator(num_disks, pegs, start_peg, target_peg) try: @@ -195,24 +198,24 @@ def __getitem__(self, idx: int) -> dict: except ValueError as ve: # print(f"Error during move generation: {ve}") raise ve - + """ # Debug: Print the solution moves print(f"Solution Length: {len(solution)}") print("Solution Moves:") for move_num, move in enumerate(solution, start=1): print(f" Move {move_num}: {move}") """ - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): # Largest disk at the bottom pegs_state[start_peg].append(disk) - + # Generate initial state visualization if requested initial_state_str = None if self.visualize: initial_state_str = self._visualize_state(pegs_state) - + # Apply moves to track state changes states = [] if self.visualize: @@ -224,24 +227,24 @@ def __getitem__(self, idx: int) -> dict: except ValueError as ve: # print(f"Error parsing move: {ve}") raise ve - + # Validate the move if not self._validate_move(pegs_state, move): - #print(f"Invalid move detected: {move}") - #print(f"Current Pegs State: {pegs_state}") + # print(f"Invalid move detected: {move}") + # print(f"Current Pegs State: {pegs_state}") raise ValueError(f"Invalid move detected: {move}") - + # Move the disk pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) - + # Visualize the new state new_state_str = self._visualize_state(pegs_state) states.append(new_state_str) - + # Peg labels peg_labels = {peg: f"Peg {peg}" for peg in pegs} - + question_str = ( f"Solve the Tower of Hanoi problem with {num_disks} disks and {num_pegs} pegs.\n" f"Move all disks from {peg_labels[start_peg]} to {peg_labels[target_peg]} following the rules:\n" @@ -250,7 +253,7 @@ def __getitem__(self, idx: int) -> dict: "- All disks must be on a peg at all times.\n" "Provide the sequence of moves." ) - + result = { "question": question_str, "answer": solution, @@ -263,28 +266,28 @@ def __getitem__(self, idx: int) -> dict: "solution_length": len(solution), }, } - + if self.visualize: result["initial_state"] = initial_state_str result["states"] = states # List of all states including initial and after each move - + return result - + def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: """ Create an ASCII visualization of the current state of the pegs. Adapts to variable number of pegs. - + Args: pegs_state (dict): Dictionary mapping peg numbers to lists of disks. - + Returns: str: ASCII art representing the pegs and disks. """ # Determine the number of levels based on the maximum number of disks on any peg max_height = max(len(disks) for disks in pegs_state.values()) pegs = sorted(pegs_state.keys()) - + visualization = "" for level in range(max_height, 0, -1): for peg in pegs: @@ -295,24 +298,24 @@ def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: disk_str = "[ ]" visualization += disk_str.center(7) # Adjust spacing as needed visualization += "\n" - + # Add the base and peg numbers visualization += "-" * (7 * len(pegs)) + "\n" for peg in pegs: peg_label = f"P{peg}".center(7) visualization += peg_label visualization += "\n" - + return visualization - + def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: """ Validate that a move adheres to the Tower of Hanoi rules. - + Args: pegs_state (dict): Current state of the pegs. move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". - + Returns: bool: True if the move is valid, False otherwise. """ @@ -324,29 +327,29 @@ def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: disk = int(parts[2]) from_peg = int(parts[5]) to_peg = int(parts[8]) - + # Check if the disk to move is the top disk on the from_peg if not pegs_state[from_peg] or pegs_state[from_peg][-1] != disk: # print(f"Disk {disk} is not on top of Peg {from_peg}. Current state: {pegs_state[from_peg]}") return False - + # Check if placing the disk on the to_peg violates size constraints if pegs_state[to_peg] and pegs_state[to_peg][-1] < disk: # print(f"Cannot place disk {disk} on top of smaller disk {pegs_state[to_peg][-1]} on Peg {to_peg}.") return False - + return True except Exception as e: print(f"Error validating move '{move}': {e}") return False - + def _parse_move(self, move: str) -> Tuple[int, int, int]: """ Parse a move string and extract disk number, from peg, and to peg. - + Args: move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". - + Returns: tuple: (disk, from_peg, to_peg) """ @@ -354,11 +357,12 @@ def _parse_move(self, move: str) -> Tuple[int, int, int]: match = re.match(pattern, move) if not match: raise ValueError(f"Unexpected move format: '{move}'") - + disk = int(match.group(1)) from_peg = int(match.group(2)) to_peg = int(match.group(3)) return disk, from_peg, to_peg + # Register the dataset register_dataset("tower_of_hanoi", HanoiDataset, HanoiConfig) diff --git a/tests/test_tower_of_hanoi.py b/tests/test_tower_of_hanoi.py index a3a89023..a4228bc3 100644 --- a/tests/test_tower_of_hanoi.py +++ b/tests/test_tower_of_hanoi.py @@ -1,35 +1,38 @@ """Tests for Tower of Hanoi puzzle generation""" -import pytest import re +import pytest + from reasoning_gym.games.tower_of_hanoi import HanoiConfig, HanoiDataset + def test_toh_config_validation(): """Test that invalid configurations raise appropriate errors.""" # Test negative number of disks with pytest.raises(AssertionError): config = HanoiConfig(min_disks=0) # At least 1 disk required config.validate() - + # Test max_disks less than min_disks with pytest.raises(AssertionError): config = HanoiConfig(min_disks=5, max_disks=3) config.validate() - + # Test min_pegs less than 3 with pytest.raises(AssertionError): config = HanoiConfig(min_pegs=2) config.validate() - + # Test max_pegs less than min_pegs with pytest.raises(AssertionError): config = HanoiConfig(min_pegs=3, max_pegs=2) config.validate() - + # Test invalid move configurations if any (assuming such validations exist) # Add more tests based on the actual validation logic in HanoiConfig + def test_toh_dataset_deterministic(): """Test that dataset generates the same items with the same seed.""" config = HanoiConfig(seed=42, size=10) @@ -39,6 +42,7 @@ def test_toh_dataset_deterministic(): for i in range(len(dataset1)): assert dataset1[i] == dataset2[i], f"Mismatch found in instance {i} with seed 42." + def test_toh_dataset_items(): """Test basic properties of generated items.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) @@ -46,13 +50,13 @@ def test_toh_dataset_items(): for i in range(len(dataset)): item = dataset[i] - + # Check item structure assert isinstance(item, dict), f"Item {i} is not a dictionary." assert "question" in item, f"Item {i} missing 'question' key." assert "answer" in item, f"Item {i} missing 'answer' key." assert "metadata" in item, f"Item {i} missing 'metadata' key." - + # Check metadata metadata = item["metadata"] assert "num_disks" in metadata, f"Item {i} metadata missing 'num_disks'." @@ -61,71 +65,66 @@ def test_toh_dataset_items(): assert "target_peg" in metadata, f"Item {i} metadata missing 'target_peg'." assert "auxiliary_pegs" in metadata, f"Item {i} metadata missing 'auxiliary_pegs'." assert "solution_length" in metadata, f"Item {i} metadata missing 'solution_length'." - + num_disks = metadata["num_disks"] num_pegs = metadata["num_pegs"] start_peg = metadata["start_peg"] target_peg = metadata["target_peg"] auxiliary_pegs = metadata["auxiliary_pegs"] solution_length = metadata["solution_length"] - + # Verify peg counts - assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, ( - f"Item {i} has inconsistent peg counts." - ) - + assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, f"Item {i} has inconsistent peg counts." + # Verify solution_length consistency - assert solution_length == len(item["answer"]), ( - f"Item {i} metadata 'solution_length' does not match actual number of moves." - ) - + assert solution_length == len( + item["answer"] + ), f"Item {i} metadata 'solution_length' does not match actual number of moves." + # Optional: Additional checks like verifying that start and target pegs are distinct assert start_peg != target_peg, f"Item {i} has identical start and target pegs." + def test_toh_move_validity(): """Test that all moves in each problem instance are valid according to Tower of Hanoi rules.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) dataset = HanoiDataset(config) for idx, instance in enumerate(dataset): - num_disks = instance['metadata']['num_disks'] - num_pegs = instance['metadata']['num_pegs'] - start_peg = instance['metadata']['start_peg'] - target_peg = instance['metadata']['target_peg'] - auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] pegs = list(range(1, num_pegs + 1)) - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): pegs_state[start_peg].append(disk) - + # Iterate over each move and validate - for move_num, move in enumerate(instance['answer'], start=1): + for move_num, move in enumerate(instance["answer"], start=1): disk, from_peg, to_peg = parse_move(move) - + # Check that from_peg exists - assert from_peg in pegs, ( - f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." - ) - + assert from_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." + # Check that to_peg exists - assert to_peg in pegs, ( - f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." - ) - + assert to_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." + # Check that from_peg is not empty - assert pegs_state[from_peg], ( - f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." - ) - + assert pegs_state[ + from_peg + ], f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." + # Check that the disk to move is on top of from_peg top_disk = pegs_state[from_peg][-1] assert disk == top_disk, ( f"Move {move_num} in Instance {idx} attempts to move disk {disk} " f"which is not on top of Peg {from_peg} (top disk: {top_disk})." ) - + # Check that moving disk to to_peg does not violate size constraints if pegs_state[to_peg]: top_to_disk = pegs_state[to_peg][-1] @@ -133,53 +132,51 @@ def test_toh_move_validity(): f"Move {move_num} in Instance {idx} attempts to place disk {disk} " f"on top of smaller disk {top_to_disk} on Peg {to_peg}." ) - + # Perform the move pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) + def test_toh_final_state_correct(): """Test that the final state of each problem instance has all disks on the target peg in correct order.""" config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) dataset = HanoiDataset(config) for idx, instance in enumerate(dataset): - num_disks = instance['metadata']['num_disks'] - num_pegs = instance['metadata']['num_pegs'] - start_peg = instance['metadata']['start_peg'] - target_peg = instance['metadata']['target_peg'] - auxiliary_pegs = instance['metadata']['auxiliary_pegs'] + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] pegs = list(range(1, num_pegs + 1)) - + # Initialize pegs_state: all disks start on the start peg pegs_state = {peg: [] for peg in pegs} for disk in range(num_disks, 0, -1): pegs_state[start_peg].append(disk) - + # Perform all moves - for move in instance['answer']: + for move in instance["answer"]: disk, from_peg, to_peg = parse_move(move) pegs_state[from_peg].pop() pegs_state[to_peg].append(disk) - + # After all moves, all disks should be on target peg in descending order final_pegs = pegs_state[target_peg] - assert len(final_pegs) == num_disks, ( - f"Instance {idx} does not have all disks on the target Peg {target_peg}." - ) - + assert len(final_pegs) == num_disks, f"Instance {idx} does not have all disks on the target Peg {target_peg}." + # Verify that disks are in correct order on target peg expected_final = list(range(num_disks, 0, -1)) - assert final_pegs == expected_final, ( - f"Instance {idx} has disks on Peg {target_peg} in incorrect order." - ) - + assert final_pegs == expected_final, f"Instance {idx} has disks on Peg {target_peg} in incorrect order." + # Ensure all other pegs are empty for peg in pegs: if peg != target_peg: - assert len(pegs_state[peg]) == 0, ( - f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." - ) + assert ( + len(pegs_state[peg]) == 0 + ), f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." + def test_toh_dataset_iteration(): """Test that iteration respects dataset size and multiple iterations yield the same items.""" @@ -187,16 +184,15 @@ def test_toh_dataset_iteration(): dataset = HanoiDataset(config) # Test dataset size - assert len(dataset) == config.size, ( - f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." - ) - + assert len(dataset) == config.size, f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." + # Collect items items = list(dataset) - + # Test multiple iterations yield the same items assert items == list(dataset), "Multiple iterations over the dataset do not yield the same items." + def parse_move(move_str: str) -> tuple: """Parse a move string and extract disk number, from peg, and to peg. @@ -214,6 +210,7 @@ def parse_move(move_str: str) -> tuple: to_peg = int(match.group(3)) return disk, from_peg, to_peg + def is_valid_final_state(pegs_state: dict, target_peg: int, num_disks: int) -> bool: """Verify that all disks are on the target peg in descending order. From 19c491aaf8ad2ea227145929a9114777d1f61457 Mon Sep 17 00:00:00 2001 From: Joe Norton <16323+joenorton@users.noreply.github.com> Date: Fri, 31 Jan 2025 01:14:45 -0800 Subject: [PATCH 3/3] add example text --- reasoning_gym/games/tower_of_hanoi.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/reasoning_gym/games/tower_of_hanoi.py b/reasoning_gym/games/tower_of_hanoi.py index 3f878b60..df902300 100644 --- a/reasoning_gym/games/tower_of_hanoi.py +++ b/reasoning_gym/games/tower_of_hanoi.py @@ -251,6 +251,11 @@ def __getitem__(self, idx: int) -> dict: "- Only one disk can be moved at a time.\n" "- A larger disk cannot be placed on top of a smaller disk.\n" "- All disks must be on a peg at all times.\n" + "Example:\n" + "Move disk 1 from Peg 1 to Peg 3\n" + "Move disk 2 from Peg 1 to Peg 2\n" + "Move disk 1 from Peg 3 to Peg 2\n" + "\n" "Provide the sequence of moves." )