From 90b269203aa9ceed802712204cbe6c3521436564 Mon Sep 17 00:00:00 2001 From: Kye Date: Wed, 8 Nov 2023 19:32:37 -0500 Subject: [PATCH] code quality --- code_quality.sh | 19 ++++ docs/.DS_Store | Bin 6148 -> 6148 bytes playground/silu_visualization.py | 16 +-- playground/swarmalator_example.py | 2 +- requirements.txt | 11 ++ swarms_torch/ant_colony_swarm.py | 12 +- swarms_torch/autoregressive.py | 103 +++++++++--------- swarms_torch/cellular_transformer.py | 10 +- swarms_torch/fish_school.py | 21 ++-- swarms_torch/graph_cellular_automa.py | 23 ++-- swarms_torch/ma_agent.py | 22 ++-- swarms_torch/multi_swarm_pso.py | 35 +++--- swarms_torch/multi_swarm_pso2.py | 21 ++-- swarms_torch/multi_swarm_pso_transformer.py | 42 ++++--- swarms_torch/neuronal_transformer.py | 15 +-- swarms_torch/particle_swarm.py | 42 +++---- swarms_torch/queen_bee.py | 50 ++++----- swarms_torch/spiral_optimization.py | 17 ++- swarms_torch/swarmalators/swarmalator_base.py | 48 ++++---- .../swarmalators/swarmalator_transformer.py | 21 ++-- .../swarmalators/swarmalator_visualize.py | 11 +- swarms_torch/transformer_pso.py | 20 ++-- tests/ant_colony.py | 18 +-- tests/cellular_swarm.py | 1 - tests/fish_school.py | 13 ++- tests/neuronal_transformer.py | 25 +++-- tests/particle_swarm.py | 7 +- tests/queen_bee.py | 1 + tests/spiral_optimization.py | 1 - tests/swarmalator_base.py | 63 ++++++----- tests/transformer_pso.py | 42 ++++--- 31 files changed, 395 insertions(+), 337 deletions(-) create mode 100755 code_quality.sh diff --git a/code_quality.sh b/code_quality.sh new file mode 100755 index 0000000..cd23e33 --- /dev/null +++ b/code_quality.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# Navigate to the directory containing the 'swarms_torch' folder +# cd /path/to/your/code/directory + +# Run autopep8 with max aggressiveness (-aaa) and in-place modification (-i) +# on all Python files (*.py) under the 'swarms_torch' directory. +autopep8 --in-place --aggressive --aggressive --recursive --experimental --list-fixes swarms_torch/ + +# Run black with default settings, since black does not have an aggressiveness level. +# Black will format all Python files it finds in the 'swarms_torch' directory. +black --experimental-string-processing swarms_torch/ + +# Run ruff on the 'swarms_torch' directory. +# Add any additional flags if needed according to your version of ruff. +ruff swarms_torch/ + +# YAPF +# yapf --recursive --in-place --verbose --style=google --parallel swarms_torch diff --git a/docs/.DS_Store b/docs/.DS_Store index 5852b4656badfb5a2dcf659c156c83a333e2af97..1da5e8eb3b6761f8d1ddfd4f35defb8a2e1ce833 100644 GIT binary patch delta 53 zcmZoMXfc@JFUrEez`)4BAi%&-#E{95#8Apm!jQRHkYhPBBgmyr&`2CACJV4CZPsHs&9s@F<1aq|rv)4o diff --git a/playground/silu_visualization.py b/playground/silu_visualization.py index 6b94940..4d29ead 100644 --- a/playground/silu_visualization.py +++ b/playground/silu_visualization.py @@ -2,23 +2,25 @@ import numpy as np from mpl_toolkits.mplot3d import Axes3D + # SiLU (Sigmoid-weighted Linear Unit) activation function def silu(x): return x * (1 / (1 + np.exp(-x))) + # Generate inputs and calculate SiLU outputs input_values = np.linspace(-10, 10, 100) output_values = silu(input_values) # Create 3D plot fig = plt.figure() -ax = fig.add_subplot(111, projection='3d') +ax = fig.add_subplot(111, projection="3d") # Scatter plot of SiLU outputs -ax.scatter(input_values, output_values, input_values, c=output_values, cmap='viridis') -ax.set_xlabel('Input') -ax.set_ylabel('Output') -ax.set_zlabel('Input') -ax.set_title('3D Visualization of SiLU Activation Function') +ax.scatter(input_values, output_values, input_values, c=output_values, cmap="viridis") +ax.set_xlabel("Input") +ax.set_ylabel("Output") +ax.set_zlabel("Input") +ax.set_title("3D Visualization of SiLU Activation Function") -plt.show() \ No newline at end of file +plt.show() diff --git a/playground/swarmalator_example.py b/playground/swarmalator_example.py index 2e356b5..e7074e6 100644 --- a/playground/swarmalator_example.py +++ b/playground/swarmalator_example.py @@ -10,4 +10,4 @@ ) # Call the visualization function -visualize_swarmalators(xi) \ No newline at end of file +visualize_swarmalators(xi) diff --git a/requirements.txt b/requirements.txt index 973b2f8..30e4824 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,14 @@ +torch +einops +pandas + + + + + + + + mkdocs mkdocs-material mkdocs-glightbox diff --git a/swarms_torch/ant_colony_swarm.py b/swarms_torch/ant_colony_swarm.py index 65c7eb6..fc8c6c6 100644 --- a/swarms_torch/ant_colony_swarm.py +++ b/swarms_torch/ant_colony_swarm.py @@ -68,15 +68,13 @@ def fitness(self, solution): def update_pheromones(self): """Update pheromone levels""" for i, solution in enumerate(self.solutions): - self.pheromones[i] = (1 - self.evaporation_rate) * self.pheromones[ - i - ] + self.fitness(solution) + self.pheromones[i] = (1 - self.evaporation_rate + ) * self.pheromones[i] + self.fitness(solution) def choose_next_path(self): """Choose the next path based on the pheromone levels""" probabilities = (self.pheromones**self.alpha) * ( - (1.0 / (1 + self.pheromones)) ** self.beta - ) + (1.0 / (1 + self.pheromones))**self.beta) probabilities /= probabilities.sum() @@ -90,8 +88,8 @@ def optimize(self): # This is a placeholder. Actual implementation will define how # ants traverse the search space. solution = torch.randint( - 32, 127, (len(self.goal),), dtype=torch.float32 - ) # Random characters. + 32, 127, (len(self.goal),), + dtype=torch.float32) # Random characters. self.solutions.append(solution) self.update_pheromones() diff --git a/swarms_torch/autoregressive.py b/swarms_torch/autoregressive.py index 79d79ac..17c168a 100644 --- a/swarms_torch/autoregressive.py +++ b/swarms_torch/autoregressive.py @@ -25,6 +25,7 @@ def cast_tuple(t, length=1): def eval_decorator(fn): + def inner(self, *args, **kwargs): was_training = self.training self.eval() @@ -47,7 +48,8 @@ def align_right(t, lens, pad_id=0): pad_lens = seq_len - lens max_pad_len = pad_lens.amax() - batch_arange = torch.arange(batch, device=device, dtype=torch.long)[..., None] + batch_arange = torch.arange(batch, device=device, dtype=torch.long)[..., + None] prompt_len_arange = torch.arange(seq_len, device=device, dtype=torch.long) t = F.pad(t, (max_pad_len, 0), value=0) @@ -65,7 +67,8 @@ def top_p(logits, thres=0.9): cum_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) sorted_indices_to_remove = cum_probs > thres - sorted_indices_to_remove = F.pad(sorted_indices_to_remove, (1, -1), value=False) + sorted_indices_to_remove = F.pad(sorted_indices_to_remove, (1, -1), + value=False) sorted_logits[sorted_indices_to_remove] = float("-inf") return sorted_logits.scatter(1, sorted_indices, sorted_logits) @@ -108,8 +111,7 @@ def contrastive_decode_fn(expert_logits, amateur_logits, alpha=0.1, beta=0.5): cutoff = log(alpha) + expert_logits.amax(dim=-1, keepdim=True) diffs = (1 + beta) * expert_logits - beta * amateur_logits contrastive_decode_logits = diffs.masked_fill( - expert_logits < cutoff, -torch.finfo(expert_logits.dtype).max - ) + expert_logits < cutoff, -torch.finfo(expert_logits.dtype).max) return contrastive_decode_logits @@ -117,9 +119,13 @@ def contrastive_decode_fn(expert_logits, amateur_logits, alpha=0.1, beta=0.5): class AutoregressiveWrapper(Module): - def __init__( - self, net, ignore_index=-100, pad_value=0, mask_prob=0.0, add_attn_z_loss=False - ): + + def __init__(self, + net, + ignore_index=-100, + pad_value=0, + mask_prob=0.0, + add_attn_z_loss=False): super().__init__() self.pad_value = pad_value self.ignore_index = ignore_index @@ -138,21 +144,20 @@ def __init__( @torch.no_grad() @eval_decorator - def generate( - self, - prompts, - seq_len, - eos_token=None, - temperature=1.0, - prompt_lens: Optional[Tensor] = None, - filter_logits_fn: Callable = top_k, - restrict_to_max_seq_len=True, - amateur_model: Optional[Union[Module, Tuple[Module]]] = None, - filter_kwargs: dict = dict(), - contrastive_decode_kwargs: Union[dict, Tuple[dict]] = dict(beta=0.5, alpha=0.1), - cache_kv=True, - **kwargs - ): + def generate(self, + prompts, + seq_len, + eos_token=None, + temperature=1.0, + prompt_lens: Optional[Tensor] = None, + filter_logits_fn: Callable = top_k, + restrict_to_max_seq_len=True, + amateur_model: Optional[Union[Module, Tuple[Module]]] = None, + filter_kwargs: dict = dict(), + contrastive_decode_kwargs: Union[dict, Tuple[dict]] = dict( + beta=0.5, alpha=0.1), + cache_kv=True, + **kwargs): max_seq_len, device = self.max_seq_len, prompts.device prompts, ps = pack([prompts], "* n") @@ -200,16 +205,15 @@ def generate( if exists(cache): for inter in cache.attn_intermediates: inter.cached_kv = [ - t[..., -(max_seq_len - 1) :, :] for t in inter.cached_kv + t[..., -(max_seq_len - 1):, :] + for t in inter.cached_kv ] - logits, new_cache = self.net( - x, - return_intermediates=True, - cache=cache, - seq_start_pos=seq_start_pos, - **kwargs - ) + logits, new_cache = self.net(x, + return_intermediates=True, + cache=cache, + seq_start_pos=seq_start_pos, + **kwargs) if cache_kv and self.net.can_cache_kv: cache = new_cache @@ -221,28 +225,27 @@ def generate( if exists(amateur_model): for i, ( - amateur, - amateur_cache, - amateur_contrastive_decode_kwargs, + amateur, + amateur_cache, + amateur_contrastive_decode_kwargs, ) in enumerate( - zip(amateur_model, amateur_caches, contrastive_decode_kwargs) - ): + zip(amateur_model, amateur_caches, + contrastive_decode_kwargs)): amateur_logits, next_amateur_cache = amateur( x, return_intermediates=True, cache=amateur_cache, seq_start_pos=seq_start_pos, - **kwargs - ) + **kwargs) amateur_logits = amateur_logits[:, -1] - assert ( - amateur_logits.shape == logits.shape - ), "logits dimension are not the same between amateur and expert model" + assert amateur_logits.shape == logits.shape, ( + "logits dimension are not the same between amateur and expert" + " model") logits = contrastive_decode_fn( - logits, amateur_logits, **amateur_contrastive_decode_kwargs - ) + logits, amateur_logits, + **amateur_contrastive_decode_kwargs) if cache_kv and amateur.can_cache_kv: amateur_caches[i] = next_amateur_cache @@ -286,20 +289,20 @@ def forward(self, x, **kwargs): if self.mask_prob > 0.0: rand = torch.randn(inp.shape, device=x.device) rand[:, 0] = -torch.finfo( - rand.dtype - ).max # first token should not be masked out + rand.dtype).max # first token should not be masked out num_mask = min(int(seq * self.mask_prob), seq - 1) indices = rand.topk(num_mask, dim=-1).indices mask = ~torch.zeros_like(inp).scatter(1, indices, 1.0).bool() kwargs.update(self_attn_kv_mask=mask) - logits, cache = self.net( - inp, return_intermediates=True, return_attn_z_loss=add_attn_z_loss, **kwargs - ) + logits, cache = self.net(inp, + return_intermediates=True, + return_attn_z_loss=add_attn_z_loss, + **kwargs) - loss = F.cross_entropy( - rearrange(logits, "b n c -> b c n"), target, ignore_index=ignore_index - ) + loss = F.cross_entropy(rearrange(logits, "b n c -> b c n"), + target, + ignore_index=ignore_index) if add_attn_z_loss: loss = loss + cache.attn_z_loss diff --git a/swarms_torch/cellular_transformer.py b/swarms_torch/cellular_transformer.py index 2a7dbc1..c6e4be0 100644 --- a/swarms_torch/cellular_transformer.py +++ b/swarms_torch/cellular_transformer.py @@ -3,6 +3,7 @@ class TransformerCell(nn.Module): + def __init__( self, input_dim, @@ -11,9 +12,9 @@ def __init__( neighborhood_size=3, ): super(TransformerCell, self).__init__() - self.transformer = nn.Transformer( - input_dim, nhead=nhead, num_encoder_layers=num_layers - ) + self.transformer = nn.Transformer(input_dim, + nhead=nhead, + num_encoder_layers=num_layers) self.neighborhood_size = neighborhood_size def forward(self, x, neigbors): @@ -56,8 +57,7 @@ class CellularSwarm(nn.Module): def __init__(self, cell_count, input_dim, nhead, time_steps=4): super(CellularSwarm, self).__init__() self.cells = nn.ModuleList( - [TransformerCell(input_dim, nhead) for _ in range(cell_count)] - ) + [TransformerCell(input_dim, nhead) for _ in range(cell_count)]) self.time_steps = time_steps def forward(self, x): diff --git a/swarms_torch/fish_school.py b/swarms_torch/fish_school.py index 7f7a219..0fd6996 100644 --- a/swarms_torch/fish_school.py +++ b/swarms_torch/fish_school.py @@ -72,9 +72,10 @@ def __init__( alpha=0.1, ): super().__init__() - self.model = Transformer( - d_model=dim, nhead=heads, num_encoder_layers=depth, num_decoder_layers=depth - ) + self.model = Transformer(d_model=dim, + nhead=heads, + num_encoder_layers=depth, + num_decoder_layers=depth) self.optimizer = Adam(self.parameters()) self.scheduler = ReduceLROnPlateau(self.optimizer, "min") @@ -96,13 +97,15 @@ def train(self, src, tgt, labels): outputs = self.model(src, tgt) # cross entropy loss - loss = CrossEntropyLoss()(outputs.view(-1, outputs.size(-1)), labels.view(-1)) + loss = CrossEntropyLoss()(outputs.view(-1, outputs.size(-1)), + labels.view(-1)) # complexity regularization by adding the sum of the squares of the # weights if self.complexity_regularization: # complexity regularization - loss += self.alpha * sum(p.pow(2.0).sum() for p in self.model.parameters()) + loss += self.alpha * sum( + p.pow(2.0).sum() for p in self.model.parameters()) # backpropagation loss.backward() @@ -211,7 +214,8 @@ def forward(self, src, tgt, labels): # with higher food if self.complex_school: for fish in self.fish: - neighbor = self.fish[torch.randint(0, len(self.fish), (1,)).item()] + neighbor = self.fish[torch.randint(0, len(self.fish), + (1,)).item()] if neighbor.food > fish.food: fish.model.load_state_dict(neighbor.model.state_dict()) @@ -234,9 +238,8 @@ def predict(self, src, tgt): averages outputs of the top peforming models """ - top_fish = sorted(self.fish, key=lambda f: f.food, reverse=True)[ - : self.num_top_fish - ] + top_fish = sorted(self.fish, key=lambda f: f.food, + reverse=True)[:self.num_top_fish] self.model.eval() diff --git a/swarms_torch/graph_cellular_automa.py b/swarms_torch/graph_cellular_automa.py index 00229d5..c9c710f 100644 --- a/swarms_torch/graph_cellular_automa.py +++ b/swarms_torch/graph_cellular_automa.py @@ -3,6 +3,7 @@ class GraphCellularAutomata(nn.Module): + def __init__(self, input_dim, hidden_dim, output_dim): super(GraphCellularAutomata, self).__init__() @@ -17,6 +18,7 @@ def forward(self, x): class ReplicationModel(nn.Module): + def __init__(self, input_dim, hidden_dim): super(ReplicationModel, self).__init__() @@ -32,26 +34,27 @@ def forward(self, x): class WeightUpdateModel(nn.Module): + def __init__(self, input_dim, hidden_dim): super(WeightUpdateModel, self).__init__() - self.mlp = nn.Sequential( - nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1) - ) + self.mlp = nn.Sequential(nn.Linear(input_dim, hidden_dim), nn.ReLU(), + nn.Linear(hidden_dim, 1)) def forward(self, x): return self.mlp(x) class NDP(nn.Module): + def __init__(self, embedding_dim, hidden_dim): super(NDP, self).__init__() - self.gc_automata = GraphCellularAutomata( - embedding_dim, hidden_dim, embedding_dim - ) + self.gc_automata = GraphCellularAutomata(embedding_dim, hidden_dim, + embedding_dim) self.replication_model = ReplicationModel(embedding_dim, hidden_dim) - self.weight_update_model = WeightUpdateModel(2 * embedding_dim, hidden_dim) + self.weight_update_model = WeightUpdateModel(2 * embedding_dim, + hidden_dim) def forward(self, node_embeddings, adjacency_matrix): # Update node embeddings using Graph Cellular Automata @@ -67,10 +70,10 @@ def forward(self, node_embeddings, adjacency_matrix): for i in range(num_nodes): for j in range(num_nodes): combined_embedding = torch.cat( - (updated_embeddings[i], updated_embeddings[j]) - ) + (updated_embeddings[i], updated_embeddings[j])) - edge_weights[i, j] = self.weight_update_model(combined_embedding) + edge_weights[i, + j] = self.weight_update_model(combined_embedding) return updated_embeddings, replication_decisions, edge_weights diff --git a/swarms_torch/ma_agent.py b/swarms_torch/ma_agent.py index 149576a..c224079 100644 --- a/swarms_torch/ma_agent.py +++ b/swarms_torch/ma_agent.py @@ -5,7 +5,9 @@ class MAgent: + class Agent(nn.Module): + def __init__(self, input_dim, output_dim): super().__init__() self.policy = nn.Sequential( @@ -19,15 +21,17 @@ def forward(self, state): return self.policy(state) class MultiGymEnvironment: + def __init__(self, env_name, num_agents): self.envs = [gym.make(env_name) for _ in range(num_agents)] self.agents = [ - MAgent.Agent( - self.envs[0].observation_space.shape[0], self.envs[0].action_space.n - ) + MAgent.Agent(self.envs[0].observation_space.shape[0], + self.envs[0].action_space.n) for _ in range(num_agents) ] - self.optimizers = [optim.Adam(agent.parameters()) for agent in self.agents] + self.optimizers = [ + optim.Adam(agent.parameters()) for agent in self.agents + ] def step(self, agent_actions): rewards = [] @@ -49,12 +53,10 @@ def train(self, epochs=1000): ] rewards = self.step(actions) - for agent, optimizer, reward in zip( - self.agents, self.optimizers, rewards - ): - loss = ( - -torch.log(agent(torch.FloatTensor(states))) * reward - ) # Example loss function + for agent, optimizer, reward in zip(self.agents, + self.optimizers, rewards): + loss = (-torch.log(agent(torch.FloatTensor(states))) * + reward) # Example loss function optimizer.zero_grad() loss.backward() optimizer.step() diff --git a/swarms_torch/multi_swarm_pso.py b/swarms_torch/multi_swarm_pso.py index 85df3e9..93bb983 100644 --- a/swarms_torch/multi_swarm_pso.py +++ b/swarms_torch/multi_swarm_pso.py @@ -77,8 +77,7 @@ def generate_random_string(self): """ return "".join( random.choice(string.ascii_lowercase + " ") - for _ in range(self.num_dimensions) - ) + for _ in range(self.num_dimensions)) def fitness_function(self, position): """Fitness function to be maximized""" @@ -96,17 +95,19 @@ def diversification_method(self, sub_swarms): def optimize(self): """Optimizes the fitness function""" - sub_swarms = [ - [self.generate_random_string() for _ in range(self.num_particles_per_swarm)] - for _ in range(self.num_sub_swarms) + sub_swarms = [[ + self.generate_random_string() + for _ in range(self.num_particles_per_swarm) ] + for _ in range(self.num_sub_swarms)] for iteration in range(self.max_iterations): for sub_swarm in sub_swarms: for particle in sub_swarm: fitness = self.fitness_function(particle) if fitness > 0: - index_to_change = random.randint(0, self.num_dimensions - 1) + index_to_change = random.randint( + 0, self.num_dimensions - 1) new_char = random.choice(string.ascii_lowercase + " ") new_position = list(particle) new_position[index_to_change] = new_char @@ -118,32 +119,26 @@ def optimize(self): global_best_fitness = max( self.fitness_function(particle) for sub_swarm in sub_swarms - for particle in sub_swarm - ) + for particle in sub_swarm) global_best_position = [ - particle - for sub_swarm in sub_swarms - for particle in sub_swarm + particle for sub_swarm in sub_swarms for particle in sub_swarm if self.fitness_function(particle) == global_best_fitness ][0] print( - f"Iteration {iteration}: Global Best Fitness = {global_best_fitness}, Global Best Position = {global_best_position}" - ) + f"Iteration {iteration}: Global Best Fitness = {global_best_fitness}," + f" Global Best Position = {global_best_position}") global_best_fitness = max( self.fitness_function(particle) for sub_swarm in sub_swarms - for particle in sub_swarm - ) + for particle in sub_swarm) global_best_position = [ - particle - for sub_swarm in sub_swarms - for particle in sub_swarm + particle for sub_swarm in sub_swarms for particle in sub_swarm if self.fitness_function(particle) == global_best_fitness ][0] print( - f"Final Result: Global Best Fitness = {global_best_fitness}, Global Best Position = {global_best_position}" - ) + f"Final Result: Global Best Fitness = {global_best_fitness}, Global Best" + f" Position = {global_best_position}") # Example usage diff --git a/swarms_torch/multi_swarm_pso2.py b/swarms_torch/multi_swarm_pso2.py index c58b207..f35b167 100644 --- a/swarms_torch/multi_swarm_pso2.py +++ b/swarms_torch/multi_swarm_pso2.py @@ -2,6 +2,7 @@ class Particle: + def __init__(self, dim, minx, maxx): self.position = torch.rand(dim) * (maxx - minx) + minx self.velocity = torch.rand(dim) * (maxx - minx) + minx @@ -11,11 +12,9 @@ def __init__(self, dim, minx, maxx): def update_velocity(self, global_best, w=0.7, c1=1.5, c2=1.5): r1 = torch.rand(self.position.size()) r2 = torch.rand(self.position.size()) - self.velocity = ( - w * self.velocity - + c1 * r1 * (self.best_position - self.position) - + c2 * r2 * (global_best - self.position) - ) + self.velocity = (w * self.velocity + c1 * r1 * + (self.best_position - self.position) + c2 * r2 * + (global_best - self.position)) def update_position(self, minx, maxx): self.position += self.velocity @@ -23,8 +22,11 @@ def update_position(self, minx, maxx): class Swarm: + def __init__(self, num_particles, dim, minx, maxx): - self.particles = [Particle(dim, minx, maxx) for _ in range(num_particles)] + self.particles = [ + Particle(dim, minx, maxx) for _ in range(num_particles) + ] self.global_best = None self.global_best_score = float("inf") @@ -41,8 +43,11 @@ def move_particles(self, minx, maxx): class MultiSwarm: + def __init__(self, num_swarms, num_particles, dim, minx, maxx): - self.swarms = [Swarm(num_particles, dim, minx, maxx) for _ in range(num_swarms)] + self.swarms = [ + Swarm(num_particles, dim, minx, maxx) for _ in range(num_swarms) + ] self.minx = minx self.maxx = maxx @@ -56,7 +61,7 @@ def optimize(self, func, max_iter): def rosenbrock(x, a=1, b=100): - return (a - x[0]) ** 2 + b * (x[1] - x[0] ** 2) ** 2 + return (a - x[0])**2 + b * (x[1] - x[0]**2)**2 # num_swarms = 5 diff --git a/swarms_torch/multi_swarm_pso_transformer.py b/swarms_torch/multi_swarm_pso_transformer.py index 904caa6..bc113bd 100644 --- a/swarms_torch/multi_swarm_pso_transformer.py +++ b/swarms_torch/multi_swarm_pso_transformer.py @@ -3,7 +3,6 @@ # from copy import deepcopy # from swarms_torch.transformer_pso import Particle, TransformerParticleSwarmOptimization - # class MultiSwarm(nn.Module): # def __init__( # self, @@ -42,7 +41,6 @@ # #for simplicity re init 2nd swarm # self.swarms[idx2] = TransformerParticleSwarmOptimization(*self.swarms[idx2].model_args, **self.swarms[idx2].kwargs) - # import torch # from torch.utils.data import DataLoader, TensorDataset @@ -84,6 +82,7 @@ class Particle(nn.Module): + def __init__(self, input_dim, hidden_dim, output_dim): super(Particle, self).__init__() self.transformer = nn.Transformer(input_dim, hidden_dim) @@ -96,9 +95,9 @@ def forward(self, x): class MultiSwarmOptimizer: - def __init__( - self, particle, num_particles, num_subswarms, fitness_func, bounds, num_epochs - ): + + def __init__(self, particle, num_particles, num_subswarms, fitness_func, + bounds, num_epochs): self.particle = particle self.num_particles = num_particles self.num_subswarms = num_subswarms @@ -108,7 +107,8 @@ def __init__( self.subswarms = [] for _ in range(num_subswarms): - self.subswarms.append([deepcopy(particle) for _ in range(num_particles)]) + self.subswarms.append( + [deepcopy(particle) for _ in range(num_particles)]) def optimize(self): for epoch in range(self.num_epochs): @@ -122,24 +122,21 @@ def optimize(self): best_particle = max(subswarm, key=lambda p: p.best_fitness) for particle in subswarm: particle.velocity = ( - particle.velocity - + 0.5 * (particle.best_position - particle.position) - + 0.5 * (best_particle.best_position - particle.position) - ) + particle.velocity + 0.5 * + (particle.best_position - particle.position) + 0.5 * + (best_particle.best_position - particle.position)) particle.position = particle.position + particle.velocity - particle.position = torch.clamp(particle.position, *self.bounds) + particle.position = torch.clamp(particle.position, + *self.bounds) - best_subswarm = max( - self.subswarms, key=lambda s: max(p.best_fitness for p in s) - ) + best_subswarm = max(self.subswarms, + key=lambda s: max(p.best_fitness for p in s)) best_particle = max(best_subswarm, key=lambda p: p.best_fitness) - print( - f"Epoch {epoch+1}/{self.num_epochs}, Best Fitness: {best_particle.best_fitness}" - ) + print(f"Epoch {epoch+1}/{self.num_epochs}, Best Fitness:" + f" {best_particle.best_fitness}") - best_subswarm = max( - self.subswarms, key=lambda s: max(p.best_fitness for p in s) - ) + best_subswarm = max(self.subswarms, + key=lambda s: max(p.best_fitness for p in s)) best_particle = max(best_subswarm, key=lambda p: p.best_fitness) return best_particle @@ -175,9 +172,8 @@ def fitness_func(particle): particle = Particle(input_dim, hidden_dim, output_dim) # Create the multi-swarm optimizer -optimizer = MultiSwarmOptimizer( - particle, num_particles, num_subswarms, fitness_func, bounds, num_epochs -) +optimizer = MultiSwarmOptimizer(particle, num_particles, num_subswarms, + fitness_func, bounds, num_epochs) # Run the optimization best_particle = optimizer.optimize() diff --git a/swarms_torch/neuronal_transformer.py b/swarms_torch/neuronal_transformer.py index 28fc7fa..0faeb4f 100644 --- a/swarms_torch/neuronal_transformer.py +++ b/swarms_torch/neuronal_transformer.py @@ -49,12 +49,14 @@ def forward(self, x): class Neuron(nn.Module): + def __init__(self, num_states): super(Neuron, self).__init__() self.states = nn.Parameter(torch.randn(num_states)) class SynapseTransformer(nn.Module): + def __init__(self, input_dim, output_dim, nhead: int): super(SynapseTransformer, self).__init__() self.transformer = TransformerLayer(input_dim, output_dim, nhead) @@ -152,13 +154,12 @@ def __init__(self, neuron_count, num_states, input_dim, output_dim, nhead): super(NNTransformer, self).__init__() # Initialize neurons and synapses - self.neurons = nn.ModuleList([Neuron(num_states) for _ in range(neuron_count)]) - self.synapses = nn.ModuleList( - [ - SynapseTransformer(input_dim, output_dim, nhead) - for _ in range(neuron_count) - ] - ) + self.neurons = nn.ModuleList( + [Neuron(num_states) for _ in range(neuron_count)]) + self.synapses = nn.ModuleList([ + SynapseTransformer(input_dim, output_dim, nhead) + for _ in range(neuron_count) + ]) self.norm = nn.LayerNorm(output_dim) self.softmax = nn.Softmax(dim=1) diff --git a/swarms_torch/particle_swarm.py b/swarms_torch/particle_swarm.py index fed29f2..00e971f 100644 --- a/swarms_torch/particle_swarm.py +++ b/swarms_torch/particle_swarm.py @@ -78,18 +78,12 @@ def compute_fitness( ): return 1.0 / (1.0 + torch.norm((particle - self.goal).float())) - def update( - self, - ): + def update(self,): """Update the particles""" for i in range(self.n_particles): - fitness = self.compute_fitness( - self.particles[i], - ) + fitness = self.compute_fitness(self.particles[i],) - personal_best_fitness = self.compute_fitness( - self.personal_best[i], - ) + personal_best_fitness = self.compute_fitness(self.personal_best[i],) if fitness > personal_best_fitness: self.personal_best[i] = self.particles[i] @@ -100,23 +94,16 @@ def update( self.global_best = self.particles[i] # update velocity - personal_attraction = ( - self.personal_best_weight - * torch.rand(self.goal.size()) - * (self.personal_best[i] - self.particles[i]) - ) - - global_attraction = ( - self.global_best_weight - * torch.rand(self.goal.size()) - * (self.global_best - self.particles[i]) - ) - - self.velocities[i] = ( - self.inertia * self.velocities[i] - + personal_attraction - + global_attraction - ) + personal_attraction = (self.personal_best_weight * + torch.rand(self.goal.size()) * + (self.personal_best[i] - self.particles[i])) + + global_attraction = (self.global_best_weight * + torch.rand(self.goal.size()) * + (self.global_best - self.particles[i])) + + self.velocities[i] = (self.inertia * self.velocities[i] + + personal_attraction + global_attraction) # Update position self.particles[i] += self.velocities[i].int() @@ -130,4 +117,5 @@ def optimize( for _ in range(iterations): self.update() best_particle = self.global_best - print("Best Particle: ", "".join([chr(int(i)) for i in best_particle])) + print("Best Particle: ", + "".join([chr(int(i)) for i in best_particle])) diff --git a/swarms_torch/queen_bee.py b/swarms_torch/queen_bee.py index 4a4f8b6..16440e8 100644 --- a/swarms_torch/queen_bee.py +++ b/swarms_torch/queen_bee.py @@ -113,7 +113,9 @@ def _evolve(self): # Display every generation if self.queen is not None: print("queen:") - print(f"{self.decode(self.queen)} ({self.queen_fitness.item():.3f})\n") + print( + f"{self.decode(self.queen)} ({self.queen_fitness.item():.3f})\n" + ) for gene, fitness in zip(self.pool, fitnesses): print(f"{self.decode(gene)} ({fitness.item():.3f})") @@ -130,45 +132,41 @@ def _evolve(self): self.queen_fitness, fitnesses = fitnesses[0], fitnesses[1:] # Deterministic tournament selection - contender_ids = torch.randn((self.pop_size - 1, self.pop_size - 1)).argsort( - dim=-1 - )[..., : self.num_tournament_participants] - participants, tournaments = self.pool[contender_ids], fitnesses[contender_ids] - top_winner = tournaments.topk(1, dim=-1, largest=True, sorted=False).indices + contender_ids = torch.randn( + (self.pop_size - 1, self.pop_size - + 1)).argsort(dim=-1)[..., :self.num_tournament_participants] + participants, tournaments = self.pool[contender_ids], fitnesses[ + contender_ids] + top_winner = tournaments.topk(1, dim=-1, largest=True, + sorted=False).indices top_winner = top_winner.unsqueeze(-1).expand(-1, -1, self.gene_length) parents = participants.gather(1, top_winner).squeeze(1) # Cross over all chosen drones with the queen - queen_parents = self.queen.unsqueeze(0).expand( - self.pop_size - 1, self.gene_length - ) + queen_parents = self.queen.unsqueeze(0).expand(self.pop_size - 1, + self.gene_length) self.pool = torch.cat( - (queen_parents[:, : self.gene_midpoint], parents[:, self.gene_midpoint :]), + (queen_parents[:, :self.gene_midpoint], + parents[:, self.gene_midpoint:]), dim=-1, ) # Mutate genes in population - mutate_mask = ( - torch.randn(self.pool.shape).argsort(dim=-1) < self.num_code_mutate - ) + mutate_mask = (torch.randn(self.pool.shape).argsort(dim=-1) < + self.num_code_mutate) noise = torch.randint(0, 2, self.pool.shape) * 2 - 1 mutated_pool = torch.where(mutate_mask, self.pool + noise, self.pool) - strong_mutate_mask = ( - torch.randn(self.pool.shape).argsort(dim=-1) < self.strong_num_code_mutate - ) + strong_mutate_mask = (torch.randn(self.pool.shape).argsort(dim=-1) < + self.strong_num_code_mutate) noise = torch.randint(0, 2, self.pool.shape) * 2 - 1 - strong_mutated_pool = torch.where( - strong_mutate_mask, self.pool + noise, self.pool - ) + strong_mutated_pool = torch.where(strong_mutate_mask, self.pool + noise, + self.pool) - strong_mutate_pool_mask = ( - torch.randn(self.pop_size - 1).argsort(dim=-1) - < self.strong_mutate_pool_size - ) - self.pool = torch.where( - strong_mutate_pool_mask[:, None], strong_mutated_pool, mutated_pool - ) + strong_mutate_pool_mask = (torch.randn(self.pop_size - 1).argsort( + dim=-1) < self.strong_mutate_pool_size) + self.pool = torch.where(strong_mutate_pool_mask[:, None], + strong_mutated_pool, mutated_pool) self.pool.clamp_(0, 255) def _check_convergence(self): diff --git a/swarms_torch/spiral_optimization.py b/swarms_torch/spiral_optimization.py index 5adc5a1..b8bc60b 100644 --- a/swarms_torch/spiral_optimization.py +++ b/swarms_torch/spiral_optimization.py @@ -43,9 +43,8 @@ def __init__(self, goal: str = None, m: int = 10, k_max: int = 1000): - m: Number of search points (strings). - k_max: Maximum number of iterations. """ - self.goal = torch.tensor( - [ord(c) for c in goal], dtype=torch.float32 - ) # ASCII representation + self.goal = torch.tensor([ord(c) for c in goal], + dtype=torch.float32) # ASCII representation self.m = m self.k_max = k_max @@ -53,7 +52,9 @@ def __init__(self, goal: str = None, m: int = 10, k_max: int = 1000): # Initializing the search points and center randomly # Note: 32-126 is the ASCII range for all printable characters - self.points = torch.randint(32, 127, (self.m, self.n_dim), dtype=torch.float32) + self.points = torch.randint(32, + 127, (self.m, self.n_dim), + dtype=torch.float32) self.center = torch.randint(32, 127, (self.n_dim,), dtype=torch.float32) def _step_rate(self, k): @@ -73,8 +74,7 @@ def _update_points(self, k): R = torch.eye(self.n_dim) # Identity for simplicity in n-dimensions for i in range(self.m): self.points[i] = self.center + r * torch.mv( - R, (self.points[i] - self.center) - ) + R, (self.points[i] - self.center)) def _update_center(self): """Find the best search point and set as the new center.""" @@ -87,9 +87,8 @@ def optimize(self): for k in range(self.k_max): self._update_points(k) self._update_center() - if ( - torch.norm(self.center - self.goal) < 1e-5 - ): # Example convergence condition + if (torch.norm(self.center - self.goal) < + 1e-5): # Example convergence condition break def best_string(self): diff --git a/swarms_torch/swarmalators/swarmalator_base.py b/swarms_torch/swarmalators/swarmalator_base.py index c320738..1c4a3ab 100644 --- a/swarms_torch/swarmalators/swarmalator_base.py +++ b/swarms_torch/swarmalators/swarmalator_base.py @@ -7,11 +7,13 @@ def pairwise_distances(x): return torch.sqrt((diff**2).sum(2)) -def function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D): +def function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, + R, D): dists = pairwise_distances(xi) mask = (dists < R).float() - torch.eye(N) - interaction_term = mask.unsqueeze(2) * (sigma_i.unsqueeze(0) - sigma_i.unsqueeze(1)) + interaction_term = mask.unsqueeze(2) * (sigma_i.unsqueeze(0) - + sigma_i.unsqueeze(1)) interaction_sum = interaction_term.sum(1) # Define dynamics for x based on our assumptions @@ -19,9 +21,8 @@ def function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, return dx -def function_for_sigma( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D -): +def function_for_sigma(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, + epsilon_r, R, D): dists = pairwise_distances(xi) mask = (dists < R).float() - torch.eye(N) @@ -29,13 +30,22 @@ def function_for_sigma( interaction_sum = interaction_term.sum(1) # Define dynamics for sigma based on our assumptions - d_sigma = gamma * interaction_sum + epsilon_a * sigma_i - epsilon_r * (sigma_i**3) + d_sigma = gamma * interaction_sum + epsilon_a * sigma_i - epsilon_r * ( + sigma_i**3) return d_sigma -def simulate_swarmalators( - N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D, T=100, dt=0.1 -): +def simulate_swarmalators(N, + J, + alpha, + beta, + gamma, + epsilon_a, + epsilon_r, + R, + D, + T=100, + dt=0.1): xi = 2 * torch.rand(N, 3) - 1 sigma_i = torch.nn.functional.normalize(torch.randn(N, D), dim=1) @@ -44,12 +54,10 @@ def simulate_swarmalators( for t in range(T): for i in range(N): - dx = function_for_x( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) - d_sigma = function_for_sigma( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + dx = function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, + epsilon_a, epsilon_r, R, D) + d_sigma = function_for_sigma(xi, sigma_i, N, J, alpha, beta, gamma, + epsilon_a, epsilon_r, R, D) # RK4 for xi k1_x = dt * dx @@ -79,9 +87,8 @@ def simulate_swarmalators( R, D, ) - k4_x = dt * function_for_x( - xi + k3_x, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + k4_x = dt * function_for_x(xi + k3_x, sigma_i, N, J, alpha, beta, + gamma, epsilon_a, epsilon_r, R, D) xi = xi + (1 / 6) * (k1_x + 2 * k2_x + 2 * k3_x + k4_x) # RK4 for sigma_i @@ -125,9 +132,8 @@ def simulate_swarmalators( R, D, ) - sigma_i = sigma_i + (1 / 6) * ( - k1_sigma + 2 * k2_sigma + 2 * k3_sigma + k4_sigma - ) + sigma_i = sigma_i + (1 / 6) * (k1_sigma + 2 * k2_sigma + + 2 * k3_sigma + k4_sigma) sigma_i = torch.nn.functional.normalize(sigma_i, dim=1) results_xi.append(xi.clone()) diff --git a/swarms_torch/swarmalators/swarmalator_transformer.py b/swarms_torch/swarmalators/swarmalator_transformer.py index 57262f1..b74fead 100644 --- a/swarms_torch/swarmalators/swarmalator_transformer.py +++ b/swarms_torch/swarmalators/swarmalator_transformer.py @@ -16,7 +16,12 @@ class SwarmalatorModel(nn.Module): print(positions, orientations) """ - def __init__(self, N, D, nhead=8, num_encoder_layers=6, num_decoder_layers=6): + def __init__(self, + N, + D, + nhead=8, + num_encoder_layers=6, + num_decoder_layers=6): super(SwarmalatorModel, self).__init__() self.N = N self.D = D @@ -27,23 +32,19 @@ def __init__(self, N, D, nhead=8, num_encoder_layers=6, num_decoder_layers=6): # Transformer encoder to process positions and orientations encoder_layer = nn.TransformerEncoderLayer(d_model=D, nhead=nhead) self.transformer_encoder = nn.TransformerEncoder( - encoder_layer, num_layers=num_encoder_layers - ) + encoder_layer, num_layers=num_encoder_layers) # Transformer decoder to produce updated positions and orientations decoder_layer = nn.TransformerDecoderLayer(d_model=D, nhead=nhead) self.transformer_decoder = nn.TransformerDecoder( - decoder_layer, num_layers=num_decoder_layers - ) + decoder_layer, num_layers=num_decoder_layers) def forward(self, src_mask=None, tgt_mask=None, memory_mask=None): # Using transformer encoder to get memory - position_memory = self.transformer_encoder( - self.positions.unsqueeze(1), mask=src_mask - ) + position_memory = self.transformer_encoder(self.positions.unsqueeze(1), + mask=src_mask) orientation_memory = self.transformer_encoder( - self.orientations.unsqueeze(1), mask=src_mask - ) + self.orientations.unsqueeze(1), mask=src_mask) # Using transformer decoder to get updated positions and orientations updated_positions = self.transformer_decoder( self.positions.unsqueeze(1), diff --git a/swarms_torch/swarmalators/swarmalator_visualize.py b/swarms_torch/swarmalators/swarmalator_visualize.py index d27d474..3c3ee27 100644 --- a/swarms_torch/swarmalators/swarmalator_visualize.py +++ b/swarms_torch/swarmalators/swarmalator_visualize.py @@ -6,9 +6,8 @@ N = 100 J, alpha, beta, gamma, epsilon_a, epsilon_r, R = [0.1] * 7 D = 3 # Ensure D is an integer -xi, sigma_i = simulate_swarmalators( - N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D -) +xi, sigma_i = simulate_swarmalators(N, J, alpha, beta, gamma, epsilon_a, + epsilon_r, R, D) print(xi[-1], sigma_i[-1]) @@ -33,7 +32,11 @@ def update(num): scatter._offsets3d = (x_data, y_data, z_data) return (scatter,) - ani = FuncAnimation(fig, update, frames=len(results_xi), init_func=init, blit=False) + ani = FuncAnimation(fig, + update, + frames=len(results_xi), + init_func=init, + blit=False) plt.show() diff --git a/swarms_torch/transformer_pso.py b/swarms_torch/transformer_pso.py index b7e542b..f61cac7 100644 --- a/swarms_torch/transformer_pso.py +++ b/swarms_torch/transformer_pso.py @@ -30,7 +30,8 @@ class Particle(nn.Module): def __init__(self, input_dim, d_model, nhead, num_layers, output_dim): super(Particle, self).__init__() self.embedding = nn.Embedding(input_dim, d_model) - self.transformer = nn.Transformer(d_model, nhead, num_layers, num_layers) + self.transformer = nn.Transformer(d_model, nhead, num_layers, + num_layers) self.fc = nn.Linear(d_model, output_dim) def forward(self, x): @@ -104,9 +105,11 @@ def __init__( self.global_best_weight = global_best_weight # Representing particles using model parameters - param_size = sum(p.numel() for p in model_constructor(*model_args).parameters()) + param_size = sum( + p.numel() for p in model_constructor(*model_args).parameters()) self.particles = [ - self.model_constructor(*model_args).to(device) for _ in range(n_particles) + self.model_constructor(*model_args).to(device) + for _ in range(n_particles) ] self.velocities = [ torch.zeros((param_size,)).to(device) for _ in range(n_particles) @@ -151,9 +154,9 @@ def update(self): delta = self.personal_best_weight * torch.rand_like(param) * ( self.personal_best[idx][name].to(self.device) - param.data ) + self.global_best_weight * torch.rand_like(param) * ( - self.global_best[name].to(self.device) - param.data - ) - self.velocities[idx] += self.inertia * self.velocities[idx] + delta + self.global_best[name].to(self.device) - param.data) + self.velocities[ + idx] += self.inertia * self.velocities[idx] + delta param.data += self.velocities[idx] def optimize(self, iterations=1000): @@ -161,9 +164,8 @@ def optimize(self, iterations=1000): for _ in range(iterations): self.update() best_particle_score = self.compute_fitness(self.global_best) - print( - f"Iteration {_ + 1}/{iterations} - Best Particle Fitness: {best_particle_score}" - ) + print(f"Iteration {_ + 1}/{iterations} - Best Particle Fitness:" + f" {best_particle_score}") def get_best_model(self): """Get the best model.""" diff --git a/tests/ant_colony.py b/tests/ant_colony.py index 4a3dbd8..78c0982 100644 --- a/tests/ant_colony.py +++ b/tests/ant_colony.py @@ -5,10 +5,11 @@ class TestAntColonyOptimization(unittest.TestCase): + def setUp(self): - self.aco = AntColonyOptimization( - goal="Hello ACO", num_ants=1000, num_iterations=10 - ) + self.aco = AntColonyOptimization(goal="Hello ACO", + num_ants=1000, + num_iterations=10) def test_initialization(self): self.assertEqual(self.aco.goal.tolist(), [ord(c) for c in "Hello ACO"]) @@ -16,15 +17,16 @@ def test_initialization(self): self.assertEqual(self.aco.pheromones.tolist(), [1.0] * 1000) def test_fitness(self): - solution = torch.tensor([ord(c) for c in "Hello ACO"], dtype=torch.float32) - self.assertEqual( - self.aco.fitness(solution).item(), 0 - ) # Should be maximum fitness + solution = torch.tensor([ord(c) for c in "Hello ACO"], + dtype=torch.float32) + self.assertEqual(self.aco.fitness(solution).item(), + 0) # Should be maximum fitness def test_update_pheromones(self): initial_pheromones = self.aco.pheromones.clone() self.aco.solutions = [ - torch.tensor([ord(c) for c in "Hello ACO"], dtype=torch.float32) + torch.tensor([ord(c) + for c in "Hello ACO"], dtype=torch.float32) for _ in range(1000) ] self.aco.update_pheromones() diff --git a/tests/cellular_swarm.py b/tests/cellular_swarm.py index bf398b1..612634f 100644 --- a/tests/cellular_swarm.py +++ b/tests/cellular_swarm.py @@ -1,4 +1,3 @@ -import pytest import torch from swarms_torch import TransformerCell, CellularSwarm diff --git a/tests/fish_school.py b/tests/fish_school.py index 9154fbf..9ed0709 100644 --- a/tests/fish_school.py +++ b/tests/fish_school.py @@ -1,4 +1,3 @@ -import pytest import torch from swarms_torch.fish_school import Fish, FishSchool @@ -18,13 +17,21 @@ def test_fish_train(): def test_fishschool_initialization(): - fishschool = FishSchool(num_fish=10, dim=512, heads=8, depth=6, num_iter=100) + fishschool = FishSchool(num_fish=10, + dim=512, + heads=8, + depth=6, + num_iter=100) assert isinstance(fishschool, FishSchool) assert len(fishschool.fish) == 10 def test_fishschool_forward(): - fishschool = FishSchool(num_fish=10, dim=512, heads=8, depth=6, num_iter=100) + fishschool = FishSchool(num_fish=10, + dim=512, + heads=8, + depth=6, + num_iter=100) src = torch.randn(10, 32, 512) tgt = torch.randn(10, 32, 512) labels = torch.randint(0, 512, (10, 32)) diff --git a/tests/neuronal_transformer.py b/tests/neuronal_transformer.py index a8a5936..cb34d74 100644 --- a/tests/neuronal_transformer.py +++ b/tests/neuronal_transformer.py @@ -1,4 +1,3 @@ -import pytest import torch from swarms_torch.neuronal_transformer import ( TransformerLayer, @@ -27,30 +26,38 @@ def test_neuron_initialization(): def test_synapsetransformer_initialization(): - synapsetransformer = SynapseTransformer(input_dim=512, output_dim=256, nhead=8) + synapsetransformer = SynapseTransformer(input_dim=512, + output_dim=256, + nhead=8) assert isinstance(synapsetransformer, SynapseTransformer) def test_synapsetransformer_forward(): - synapsetransformer = SynapseTransformer(input_dim=512, output_dim=256, nhead=8) + synapsetransformer = SynapseTransformer(input_dim=512, + output_dim=256, + nhead=8) x = torch.randn(10, 32, 512) output = synapsetransformer(x) assert output.shape == torch.Size([10, 32, 256]) def test_nntransformer_initialization(): - nntransformer = NNTransformer( - neuron_count=5, num_states=10, input_dim=512, output_dim=256, nhead=8 - ) + nntransformer = NNTransformer(neuron_count=5, + num_states=10, + input_dim=512, + output_dim=256, + nhead=8) assert isinstance(nntransformer, NNTransformer) assert len(nntransformer.neurons) == 5 assert len(nntransformer.synapses) == 5 def test_nntransformer_forward(): - nntransformer = NNTransformer( - neuron_count=5, num_states=10, input_dim=512, output_dim=256, nhead=8 - ) + nntransformer = NNTransformer(neuron_count=5, + num_states=10, + input_dim=512, + output_dim=256, + nhead=8) x = torch.randn(1, 10) output = nntransformer(x) assert output.shape == torch.Size([10]) diff --git a/tests/particle_swarm.py b/tests/particle_swarm.py index b54ccdf..8450e11 100644 --- a/tests/particle_swarm.py +++ b/tests/particle_swarm.py @@ -5,6 +5,7 @@ class TestParticleSwarmOptimization(unittest.TestCase): + def setUp(self): self.pso = ParticleSwarmOptimization(goal="Hello", n_particles=10) @@ -29,11 +30,9 @@ def test_optimize(self): self.pso.optimize(iterations=10) # After optimization, global best should be closer to the goal initial_distance = torch.norm( - (initial_best_particle - self.pso.goal).float() - ).item() + (initial_best_particle - self.pso.goal).float()).item() final_distance = torch.norm( - (self.pso.global_best - self.pso.goal).float() - ).item() + (self.pso.global_best - self.pso.goal).float()).item() self.assertLess(final_distance, initial_distance) diff --git a/tests/queen_bee.py b/tests/queen_bee.py index 74d9c8e..4668c6d 100644 --- a/tests/queen_bee.py +++ b/tests/queen_bee.py @@ -4,6 +4,7 @@ class TestQueenBeeGa(unittest.TestCase): + def setUp(self): self.optimizer = QueenBeeGa(goal="Hello QBGA", pop_size=50) diff --git a/tests/spiral_optimization.py b/tests/spiral_optimization.py index 4109cec..019dcc5 100644 --- a/tests/spiral_optimization.py +++ b/tests/spiral_optimization.py @@ -1,4 +1,3 @@ -import pytest import torch from swarms_torch.spiral_optimization import SPO diff --git a/tests/swarmalator_base.py b/tests/swarmalator_base.py index 49f63b5..82daf3f 100644 --- a/tests/swarmalator_base.py +++ b/tests/swarmalator_base.py @@ -20,7 +20,6 @@ T = 100 dt = 0.1 - # === Test pairwise_distances === @@ -51,27 +50,24 @@ def test_pairwise_distances_symmetry(): def test_function_for_x_shape(): xi = torch.randn(N, D) sigma_i = torch.randn(N, D) - dx = function_for_x( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + dx = function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, + epsilon_r, R, D) assert dx.shape == (N, D) def test_function_for_x_output_range(): xi = torch.randn(N, D) sigma_i = torch.randn(N, D) - dx = function_for_x( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + dx = function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, + epsilon_r, R, D) assert (dx >= -1.0).all() and (dx <= 1.0).all() def test_function_for_x_zero_at_equilibrium(): xi = torch.zeros(N, D) sigma_i = torch.zeros(N, D) - dx = function_for_x( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + dx = function_for_x(xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, + epsilon_r, R, D) assert (dx == 0.0).all() @@ -81,27 +77,24 @@ def test_function_for_x_zero_at_equilibrium(): def test_function_for_sigma_shape(): xi = torch.randn(N, D) sigma_i = torch.randn(N, D) - d_sigma = function_for_sigma( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + d_sigma = function_for_sigma(xi, sigma_i, N, J, alpha, beta, gamma, + epsilon_a, epsilon_r, R, D) assert d_sigma.shape == (N, D) def test_function_for_sigma_output_range(): xi = torch.randn(N, D) sigma_i = torch.randn(N, D) - d_sigma = function_for_sigma( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + d_sigma = function_for_sigma(xi, sigma_i, N, J, alpha, beta, gamma, + epsilon_a, epsilon_r, R, D) assert (d_sigma >= -1.0).all() and (d_sigma <= 1.0).all() def test_function_for_sigma_zero_at_equilibrium(): xi = torch.zeros(N, D) sigma_i = torch.zeros(N, D) - d_sigma = function_for_sigma( - xi, sigma_i, N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D - ) + d_sigma = function_for_sigma(xi, sigma_i, N, J, alpha, beta, gamma, + epsilon_a, epsilon_r, R, D) assert (d_sigma == 0.0).all() @@ -109,9 +102,17 @@ def test_function_for_sigma_zero_at_equilibrium(): def test_simulate_swarmalators_output_shape(): - results_xi, results_sigma_i = simulate_swarmalators( - N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D, T=T, dt=dt - ) + results_xi, results_sigma_i = simulate_swarmalators(N, + J, + alpha, + beta, + gamma, + epsilon_a, + epsilon_r, + R, + D, + T=T, + dt=dt) assert len(results_xi) == T assert len(results_sigma_i) == T assert results_xi[0].shape == (N, D) @@ -119,12 +120,22 @@ def test_simulate_swarmalators_output_shape(): def test_simulate_swarmalators_convergence(): - results_xi, results_sigma_i = simulate_swarmalators( - N, J, alpha, beta, gamma, epsilon_a, epsilon_r, R, D, T=T, dt=dt - ) + results_xi, results_sigma_i = simulate_swarmalators(N, + J, + alpha, + beta, + gamma, + epsilon_a, + epsilon_r, + R, + D, + T=T, + dt=dt) for i in range(1, T): assert torch.allclose(results_xi[i], results_xi[i - 1], atol=1e-6) - assert torch.allclose(results_sigma_i[i], results_sigma_i[i - 1], atol=1e-6) + assert torch.allclose(results_sigma_i[i], + results_sigma_i[i - 1], + atol=1e-6) def test_simulate_swarmalators_non_zero_initial_condition(): diff --git a/tests/transformer_pso.py b/tests/transformer_pso.py index 2466033..4d87893 100644 --- a/tests/transformer_pso.py +++ b/tests/transformer_pso.py @@ -1,4 +1,3 @@ -import pytest import torch from torch.utils.data import DataLoader from swarms_torch.transformer_pso import ( @@ -8,16 +7,20 @@ def test_simpletransformer_initialization(): - simpletransformer = SimpleTransformer( - input_dim=10, d_model=512, nhead=8, num_layers=1, output_dim=2 - ) + simpletransformer = SimpleTransformer(input_dim=10, + d_model=512, + nhead=8, + num_layers=1, + output_dim=2) assert isinstance(simpletransformer, SimpleTransformer) def test_simpletransformer_forward(): - simpletransformer = SimpleTransformer( - input_dim=10, d_model=512, nhead=8, num_layers=1, output_dim=2 - ) + simpletransformer = SimpleTransformer(input_dim=10, + d_model=512, + nhead=8, + num_layers=1, + output_dim=2) x = torch.randint(0, 10, (10, 32)) output = simpletransformer(x) assert output.shape == torch.Size([32, 2]) @@ -32,9 +35,8 @@ def test_TransformerParticleSwarmOptimization_initialization(): [(torch.randint(0, 10, (10,)), torch.tensor(1)) for _ in range(100)], batch_size=32, ) - pso = TransformerParticleSwarmOptimization( - model_constructor, model_args, device, criterion, data_loader - ) + pso = TransformerParticleSwarmOptimization(model_constructor, model_args, + device, criterion, data_loader) assert isinstance(pso, TransformerParticleSwarmOptimization) assert len(pso.particles) == 10 assert len(pso.velocities) == 10 @@ -50,9 +52,8 @@ def test_TransformerParticleSwarmOptimization_compute_fitness(): [(torch.randint(0, 10, (10,)), torch.tensor(1)) for _ in range(100)], batch_size=32, ) - pso = TransformerParticleSwarmOptimization( - model_constructor, model_args, device, criterion, data_loader - ) + pso = TransformerParticleSwarmOptimization(model_constructor, model_args, + device, criterion, data_loader) fitness = pso.compute_fitness(pso.particles[0].state_dict()) assert isinstance(fitness, float) @@ -66,9 +67,8 @@ def test_TransformerParticleSwarmOptimization_update(): [(torch.randint(0, 10, (10,)), torch.tensor(1)) for _ in range(100)], batch_size=32, ) - pso = TransformerParticleSwarmOptimization( - model_constructor, model_args, device, criterion, data_loader - ) + pso = TransformerParticleSwarmOptimization(model_constructor, model_args, + device, criterion, data_loader) pso.update() assert len(pso.particles) == 10 assert len(pso.velocities) == 10 @@ -84,9 +84,8 @@ def test_TransformerParticleSwarmOptimization_optimize(): [(torch.randint(0, 10, (10,)), torch.tensor(1)) for _ in range(100)], batch_size=32, ) - pso = TransformerParticleSwarmOptimization( - model_constructor, model_args, device, criterion, data_loader - ) + pso = TransformerParticleSwarmOptimization(model_constructor, model_args, + device, criterion, data_loader) pso.optimize(iterations=10) assert len(pso.particles) == 10 assert len(pso.velocities) == 10 @@ -102,9 +101,8 @@ def test_TransformerParticleSwarmOptimization_get_best_model(): [(torch.randint(0, 10, (10,)), torch.tensor(1)) for _ in range(100)], batch_size=32, ) - pso = TransformerParticleSwarmOptimization( - model_constructor, model_args, device, criterion, data_loader - ) + pso = TransformerParticleSwarmOptimization(model_constructor, model_args, + device, criterion, data_loader) pso.optimize(iterations=10) best_model = pso.get_best_model() assert isinstance(best_model, SimpleTransformer)