diff --git a/reasoning_gym/games/__init__.py b/reasoning_gym/games/__init__.py index a801c6e4..6a6df59f 100644 --- a/reasoning_gym/games/__init__.py +++ b/reasoning_gym/games/__init__.py @@ -11,6 +11,7 @@ from .maze import MazeConfig, MazeDataset from .mini_sudoku import MiniSudokuConfig, MiniSudokuDataset from .sudoku import SudokuConfig, SudokuDataset +from .tower_of_hanoi import HanoiConfig, HanoiDataset __all__ = [ "CountdownConfig", @@ -23,4 +24,6 @@ "MazeDataset", "GameOfLifeConfig", "GameOfLifeDataset", + "HanoiConfig", + "HanoiDataset", ] diff --git a/reasoning_gym/games/tower_of_hanoi.py b/reasoning_gym/games/tower_of_hanoi.py new file mode 100644 index 00000000..df902300 --- /dev/null +++ b/reasoning_gym/games/tower_of_hanoi.py @@ -0,0 +1,373 @@ +# reasoning_gym/games/tower_of_hanoi.py + +import math +import random +import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +from ..factory import ProceduralDataset, register_dataset + + +@dataclass +class HanoiConfig: + """ + Configuration for the Tower of Hanoi task. + + - min_disks: Minimum number of disks in the puzzle. + - max_disks: Maximum number of disks in the puzzle. + - min_pegs: Minimum number of pegs (minimum 3). + - max_pegs: Maximum number of pegs. + - size: Number of problem instances in the dataset. + - seed: Optional seed for reproducibility. + - visualize: Whether to include a visualization of the initial state. + """ + + min_disks: int = 3 + max_disks: int = 7 + min_pegs: int = 3 + max_pegs: int = 4 + size: int = 50 + seed: Optional[int] = None + visualize: bool = False # New parameter + + def validate(self) -> None: + """Validate configuration parameters.""" + assert self.min_disks >= 1, "min_disks must be at least 1" + assert self.max_disks >= self.min_disks, "max_disks must be >= min_disks" + assert self.min_pegs >= 3, "min_pegs must be at least 3" + assert self.max_pegs >= self.min_pegs, "max_pegs must be >= min_pegs" + + +class MoveGenerator: + """ + Helper class to generate valid move sequences for Tower of Hanoi using the Frame-Stewart algorithm. + It maintains the current state of all pegs to ensure move validity. + """ + + def __init__(self, num_disks: int, pegs: List[int], start: int, target: int): + self.num_disks = num_disks + self.pegs = pegs + self.start = start + self.target = target + self.auxiliary_pegs = [peg for peg in pegs if peg not in (start, target)] + self.pegs_state: Dict[int, List[int]] = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): # Largest disk at the bottom + self.pegs_state[start].append(disk) + self.moves: List[str] = [] + self.memo: Dict[Tuple[int, int], int] = {} # Memoization for T(n, k) + + def generate_moves(self) -> List[str]: + self.move(n=self.num_disks, source=self.start, target=self.target, auxiliary_pegs=self.auxiliary_pegs) + return self.moves + + def move(self, n: int, source: int, target: int, auxiliary_pegs: List[int]): + if n == 0: + return + if n == 1: + self._move_disk(source, target) + return + + k = len(auxiliary_pegs) + 2 # Total number of pegs including source and target + + if k < 3: + raise ValueError("At least 3 pegs are required.") + + if k == 3: + # Classic Tower of Hanoi solution + aux = auxiliary_pegs[0] + self.move(n - 1, source, aux, [target]) + self._move_disk(source, target) + self.move(n - 1, aux, target, [source]) + return + + # For k > 3, apply Frame-Stewart algorithm + # Find m that minimizes 2*T(m, k) + T(n - m, k - 1) + min_moves = math.inf + best_m = 1 + for m in range(1, n): + moves_m = self._compute_T(m, k) + moves_n_minus_m = self._compute_T(n - m, k - 1) + total_moves = 2 * moves_m + moves_n_minus_m + if total_moves < min_moves: + min_moves = total_moves + best_m = m + + # Select a temporary peg to hold m disks + temp_peg = auxiliary_pegs[0] + new_auxiliary = [peg for peg in auxiliary_pegs if peg != temp_peg] + + # Step 1: Move top m disks to temp_peg using all pegs + self.move(n=best_m, source=source, target=temp_peg, auxiliary_pegs=auxiliary_pegs[1:] + [target]) + + # Step 2: Move remaining n - m disks to target using k - 1 pegs + self.move(n=n - best_m, source=source, target=target, auxiliary_pegs=new_auxiliary) + + # Step 3: Move m disks from temp_peg to target using all pegs + self.move(n=best_m, source=temp_peg, target=target, auxiliary_pegs=auxiliary_pegs[1:] + [source]) + + def _move_disk(self, from_peg: int, to_peg: int): + if not self.pegs_state[from_peg]: + raise ValueError(f"No disks to move from Peg {from_peg}.") + disk = self.pegs_state[from_peg][-1] + self.pegs_state[from_peg].pop() + self.pegs_state[to_peg].append(disk) + self.moves.append(f"Move disk {disk} from Peg {from_peg} to Peg {to_peg}") + + def _compute_T(self, n: int, k: int) -> int: + """ + Compute the minimal number of moves (T(n, k)) required to move n disks using k pegs. + Utilizes memoization to store previously computed results. + """ + if n == 0: + return 0 + if n == 1: + return 1 + if k == 3: + return 2**n - 1 + if (n, k) in self.memo: + return self.memo[(n, k)] + + min_moves = math.inf + for m in range(1, n): + moves = 2 * self._compute_T(m, k) + self._compute_T(n - m, k - 1) + if moves < min_moves: + min_moves = moves + self.memo[(n, k)] = min_moves + return min_moves + + +class HanoiDataset(ProceduralDataset): + """ + Generates Tower of Hanoi problems with solutions. + Supports variable number of pegs using the optimized Frame-Stewart algorithm with Peg State Tracking. + """ + + def __init__(self, config: HanoiConfig): + super().__init__(config=config, seed=config.seed, size=config.size) + self.min_pegs = config.min_pegs + self.max_pegs = config.max_pegs + self.min_disks = config.min_disks + self.max_disks = config.max_disks + self.visualize = config.visualize # Initialize the visualize attribute + + def __getitem__(self, idx: int) -> dict: + """ + Generate a Tower of Hanoi problem instance. + + Returns: + dict with: + - "question": Text describing the problem setup. + - "answer": List of moves to solve the puzzle. + - "metadata": Configuration and solution details. + - "initial_state": (Optional) ASCII visualization of the initial pegs. + - "states": (Optional) List of ASCII visualizations after each move. + """ + rng = random.Random(self.seed + idx if self.seed is not None else None) + + # Randomly select number of disks and pegs within the specified ranges + num_disks = rng.randint(self.min_disks, self.max_disks) + num_pegs = rng.randint(self.min_pegs, self.max_pegs) + + # Assign unique peg identifiers (e.g., integers starting from 1) + pegs = list(range(1, num_pegs + 1)) + + """ #Debug: Print current instance configuration + print(f"\n--- Generating Instance {idx} ---") + print(f"Number of Disks: {num_disks}") + print(f"Number of Pegs: {num_pegs}") + print(f"Pegs: {pegs}") + """ + + # Randomly select start and target pegs + start_peg, target_peg = rng.sample(pegs, 2) + + # Auxiliary pegs are the remaining pegs + auxiliary_pegs = [peg for peg in pegs if peg not in (start_peg, target_peg)] + + """ # Debug: Print start, target, and auxiliary pegs + print(f"Start Peg: {start_peg}") + print(f"Target Peg: {target_peg}") + print(f"Auxiliary Pegs: {auxiliary_pegs}") + """ + + # Initialize the MoveGenerator and generate moves + move_gen = MoveGenerator(num_disks, pegs, start_peg, target_peg) + try: + solution = move_gen.generate_moves() + except ValueError as ve: + # print(f"Error during move generation: {ve}") + raise ve + + """ # Debug: Print the solution moves + print(f"Solution Length: {len(solution)}") + print("Solution Moves:") + for move_num, move in enumerate(solution, start=1): + print(f" Move {move_num}: {move}") + """ + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): # Largest disk at the bottom + pegs_state[start_peg].append(disk) + + # Generate initial state visualization if requested + initial_state_str = None + if self.visualize: + initial_state_str = self._visualize_state(pegs_state) + + # Apply moves to track state changes + states = [] + if self.visualize: + states.append(initial_state_str) # Initial state + for move in solution: + # Parse the move string using regex + try: + disk, from_peg, to_peg = self._parse_move(move) + except ValueError as ve: + # print(f"Error parsing move: {ve}") + raise ve + + # Validate the move + if not self._validate_move(pegs_state, move): + # print(f"Invalid move detected: {move}") + # print(f"Current Pegs State: {pegs_state}") + raise ValueError(f"Invalid move detected: {move}") + + # Move the disk + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + + # Visualize the new state + new_state_str = self._visualize_state(pegs_state) + states.append(new_state_str) + + # Peg labels + peg_labels = {peg: f"Peg {peg}" for peg in pegs} + + question_str = ( + f"Solve the Tower of Hanoi problem with {num_disks} disks and {num_pegs} pegs.\n" + f"Move all disks from {peg_labels[start_peg]} to {peg_labels[target_peg]} following the rules:\n" + "- Only one disk can be moved at a time.\n" + "- A larger disk cannot be placed on top of a smaller disk.\n" + "- All disks must be on a peg at all times.\n" + "Example:\n" + "Move disk 1 from Peg 1 to Peg 3\n" + "Move disk 2 from Peg 1 to Peg 2\n" + "Move disk 1 from Peg 3 to Peg 2\n" + "\n" + "Provide the sequence of moves." + ) + + result = { + "question": question_str, + "answer": solution, + "metadata": { + "num_disks": num_disks, + "num_pegs": num_pegs, + "start_peg": start_peg, + "target_peg": target_peg, + "auxiliary_pegs": auxiliary_pegs, + "solution_length": len(solution), + }, + } + + if self.visualize: + result["initial_state"] = initial_state_str + result["states"] = states # List of all states including initial and after each move + + return result + + def _visualize_state(self, pegs_state: Dict[int, List[int]]) -> str: + """ + Create an ASCII visualization of the current state of the pegs. + Adapts to variable number of pegs. + + Args: + pegs_state (dict): Dictionary mapping peg numbers to lists of disks. + + Returns: + str: ASCII art representing the pegs and disks. + """ + # Determine the number of levels based on the maximum number of disks on any peg + max_height = max(len(disks) for disks in pegs_state.values()) + pegs = sorted(pegs_state.keys()) + + visualization = "" + for level in range(max_height, 0, -1): + for peg in pegs: + if len(pegs_state[peg]) >= level: + disk_size = pegs_state[peg][level - 1] + disk_str = f"[{'*' * disk_size}]" + else: + disk_str = "[ ]" + visualization += disk_str.center(7) # Adjust spacing as needed + visualization += "\n" + + # Add the base and peg numbers + visualization += "-" * (7 * len(pegs)) + "\n" + for peg in pegs: + peg_label = f"P{peg}".center(7) + visualization += peg_label + visualization += "\n" + + return visualization + + def _validate_move(self, pegs_state: Dict[int, List[int]], move: str) -> bool: + """ + Validate that a move adheres to the Tower of Hanoi rules. + + Args: + pegs_state (dict): Current state of the pegs. + move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + bool: True if the move is valid, False otherwise. + """ + try: + parts = move.split() + if len(parts) != 9: + # print(f"Unexpected move format: '{move}'") + return False + disk = int(parts[2]) + from_peg = int(parts[5]) + to_peg = int(parts[8]) + + # Check if the disk to move is the top disk on the from_peg + if not pegs_state[from_peg] or pegs_state[from_peg][-1] != disk: + # print(f"Disk {disk} is not on top of Peg {from_peg}. Current state: {pegs_state[from_peg]}") + return False + + # Check if placing the disk on the to_peg violates size constraints + if pegs_state[to_peg] and pegs_state[to_peg][-1] < disk: + # print(f"Cannot place disk {disk} on top of smaller disk {pegs_state[to_peg][-1]} on Peg {to_peg}.") + return False + + return True + except Exception as e: + print(f"Error validating move '{move}': {e}") + return False + + def _parse_move(self, move: str) -> Tuple[int, int, int]: + """ + Parse a move string and extract disk number, from peg, and to peg. + + Args: + move (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + tuple: (disk, from_peg, to_peg) + """ + pattern = r"Move disk (\d+) from Peg (\d+) to Peg (\d+)" + match = re.match(pattern, move) + if not match: + raise ValueError(f"Unexpected move format: '{move}'") + + disk = int(match.group(1)) + from_peg = int(match.group(2)) + to_peg = int(match.group(3)) + return disk, from_peg, to_peg + + +# Register the dataset +register_dataset("tower_of_hanoi", HanoiDataset, HanoiConfig) diff --git a/tests/test_tower_of_hanoi.py b/tests/test_tower_of_hanoi.py new file mode 100644 index 00000000..a4228bc3 --- /dev/null +++ b/tests/test_tower_of_hanoi.py @@ -0,0 +1,228 @@ +"""Tests for Tower of Hanoi puzzle generation""" + +import re + +import pytest + +from reasoning_gym.games.tower_of_hanoi import HanoiConfig, HanoiDataset + + +def test_toh_config_validation(): + """Test that invalid configurations raise appropriate errors.""" + # Test negative number of disks + with pytest.raises(AssertionError): + config = HanoiConfig(min_disks=0) # At least 1 disk required + config.validate() + + # Test max_disks less than min_disks + with pytest.raises(AssertionError): + config = HanoiConfig(min_disks=5, max_disks=3) + config.validate() + + # Test min_pegs less than 3 + with pytest.raises(AssertionError): + config = HanoiConfig(min_pegs=2) + config.validate() + + # Test max_pegs less than min_pegs + with pytest.raises(AssertionError): + config = HanoiConfig(min_pegs=3, max_pegs=2) + config.validate() + + # Test invalid move configurations if any (assuming such validations exist) + # Add more tests based on the actual validation logic in HanoiConfig + + +def test_toh_dataset_deterministic(): + """Test that dataset generates the same items with the same seed.""" + config = HanoiConfig(seed=42, size=10) + dataset1 = HanoiDataset(config) + dataset2 = HanoiDataset(config) + + for i in range(len(dataset1)): + assert dataset1[i] == dataset2[i], f"Mismatch found in instance {i} with seed 42." + + +def test_toh_dataset_items(): + """Test basic properties of generated items.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for i in range(len(dataset)): + item = dataset[i] + + # Check item structure + assert isinstance(item, dict), f"Item {i} is not a dictionary." + assert "question" in item, f"Item {i} missing 'question' key." + assert "answer" in item, f"Item {i} missing 'answer' key." + assert "metadata" in item, f"Item {i} missing 'metadata' key." + + # Check metadata + metadata = item["metadata"] + assert "num_disks" in metadata, f"Item {i} metadata missing 'num_disks'." + assert "num_pegs" in metadata, f"Item {i} metadata missing 'num_pegs'." + assert "start_peg" in metadata, f"Item {i} metadata missing 'start_peg'." + assert "target_peg" in metadata, f"Item {i} metadata missing 'target_peg'." + assert "auxiliary_pegs" in metadata, f"Item {i} metadata missing 'auxiliary_pegs'." + assert "solution_length" in metadata, f"Item {i} metadata missing 'solution_length'." + + num_disks = metadata["num_disks"] + num_pegs = metadata["num_pegs"] + start_peg = metadata["start_peg"] + target_peg = metadata["target_peg"] + auxiliary_pegs = metadata["auxiliary_pegs"] + solution_length = metadata["solution_length"] + + # Verify peg counts + assert num_pegs == len(metadata["auxiliary_pegs"]) + 2, f"Item {i} has inconsistent peg counts." + + # Verify solution_length consistency + assert solution_length == len( + item["answer"] + ), f"Item {i} metadata 'solution_length' does not match actual number of moves." + + # Optional: Additional checks like verifying that start and target pegs are distinct + assert start_peg != target_peg, f"Item {i} has identical start and target pegs." + + +def test_toh_move_validity(): + """Test that all moves in each problem instance are valid according to Tower of Hanoi rules.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for idx, instance in enumerate(dataset): + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] + pegs = list(range(1, num_pegs + 1)) + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): + pegs_state[start_peg].append(disk) + + # Iterate over each move and validate + for move_num, move in enumerate(instance["answer"], start=1): + disk, from_peg, to_peg = parse_move(move) + + # Check that from_peg exists + assert from_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent from_peg {from_peg}." + + # Check that to_peg exists + assert to_peg in pegs, f"Move {move_num} in Instance {idx} references non-existent to_peg {to_peg}." + + # Check that from_peg is not empty + assert pegs_state[ + from_peg + ], f"Move {move_num} in Instance {idx} attempts to move from an empty Peg {from_peg}." + + # Check that the disk to move is on top of from_peg + top_disk = pegs_state[from_peg][-1] + assert disk == top_disk, ( + f"Move {move_num} in Instance {idx} attempts to move disk {disk} " + f"which is not on top of Peg {from_peg} (top disk: {top_disk})." + ) + + # Check that moving disk to to_peg does not violate size constraints + if pegs_state[to_peg]: + top_to_disk = pegs_state[to_peg][-1] + assert top_to_disk > disk, ( + f"Move {move_num} in Instance {idx} attempts to place disk {disk} " + f"on top of smaller disk {top_to_disk} on Peg {to_peg}." + ) + + # Perform the move + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + + +def test_toh_final_state_correct(): + """Test that the final state of each problem instance has all disks on the target peg in correct order.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=10, seed=42) + dataset = HanoiDataset(config) + + for idx, instance in enumerate(dataset): + num_disks = instance["metadata"]["num_disks"] + num_pegs = instance["metadata"]["num_pegs"] + start_peg = instance["metadata"]["start_peg"] + target_peg = instance["metadata"]["target_peg"] + auxiliary_pegs = instance["metadata"]["auxiliary_pegs"] + pegs = list(range(1, num_pegs + 1)) + + # Initialize pegs_state: all disks start on the start peg + pegs_state = {peg: [] for peg in pegs} + for disk in range(num_disks, 0, -1): + pegs_state[start_peg].append(disk) + + # Perform all moves + for move in instance["answer"]: + disk, from_peg, to_peg = parse_move(move) + pegs_state[from_peg].pop() + pegs_state[to_peg].append(disk) + + # After all moves, all disks should be on target peg in descending order + final_pegs = pegs_state[target_peg] + assert len(final_pegs) == num_disks, f"Instance {idx} does not have all disks on the target Peg {target_peg}." + + # Verify that disks are in correct order on target peg + expected_final = list(range(num_disks, 0, -1)) + assert final_pegs == expected_final, f"Instance {idx} has disks on Peg {target_peg} in incorrect order." + + # Ensure all other pegs are empty + for peg in pegs: + if peg != target_peg: + assert ( + len(pegs_state[peg]) == 0 + ), f"Instance {idx} has disks remaining on Peg {peg}, which should be empty." + + +def test_toh_dataset_iteration(): + """Test that iteration respects dataset size and multiple iterations yield the same items.""" + config = HanoiConfig(min_disks=3, max_disks=5, min_pegs=3, max_pegs=4, size=5, seed=42) + dataset = HanoiDataset(config) + + # Test dataset size + assert len(dataset) == config.size, f"Dataset size mismatch: expected {config.size}, got {len(dataset)}." + + # Collect items + items = list(dataset) + + # Test multiple iterations yield the same items + assert items == list(dataset), "Multiple iterations over the dataset do not yield the same items." + + +def parse_move(move_str: str) -> tuple: + """Parse a move string and extract disk number, from peg, and to peg. + + Args: + move_str (str): Move instruction, e.g., "Move disk 2 from Peg 1 to Peg 3". + + Returns: + tuple: (disk, from_peg, to_peg) + """ + pattern = r"Move disk (\d+) from Peg (\d+) to Peg (\d+)" + match = re.match(pattern, move_str) + assert match is not None, f"Move string '{move_str}' does not match the expected format." + disk = int(match.group(1)) + from_peg = int(match.group(2)) + to_peg = int(match.group(3)) + return disk, from_peg, to_peg + + +def is_valid_final_state(pegs_state: dict, target_peg: int, num_disks: int) -> bool: + """Verify that all disks are on the target peg in descending order. + + Args: + pegs_state (dict): Current state of the pegs. + target_peg (int): The target peg number. + num_disks (int): Total number of disks. + + Returns: + bool: True if valid, False otherwise. + """ + target_stack = pegs_state[target_peg] + if len(target_stack) != num_disks: + return False + return target_stack == list(range(num_disks, 0, -1))