From 18ec897ddb1e0582486e615f1b8508505360bb68 Mon Sep 17 00:00:00 2001 From: c-rit <114767694+c-rit@users.noreply.github.com> Date: Sat, 13 Jul 2024 17:51:48 -0400 Subject: [PATCH 1/6] first version of crackme game --- zero_sum_eval/games/crackme/__init__.py | 1 + zero_sum_eval/games/crackme/crackme_game.py | 183 ++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 zero_sum_eval/games/crackme/__init__.py create mode 100644 zero_sum_eval/games/crackme/crackme_game.py diff --git a/zero_sum_eval/games/crackme/__init__.py b/zero_sum_eval/games/crackme/__init__.py new file mode 100644 index 0000000..a8dad09 --- /dev/null +++ b/zero_sum_eval/games/crackme/__init__.py @@ -0,0 +1 @@ +from .crackme_game import CrackMeGame diff --git a/zero_sum_eval/games/crackme/crackme_game.py b/zero_sum_eval/games/crackme/crackme_game.py new file mode 100644 index 0000000..1506555 --- /dev/null +++ b/zero_sum_eval/games/crackme/crackme_game.py @@ -0,0 +1,183 @@ +#file: crackme.py +import pyyaml +import ast +import random +import concurrent.futures +from game_manager import GameManager +from game_state import GameState + + + +class CrackMeGame(GameState): + ''' + This is a two player game where one player creates an obfoscation program, + and another attempts to reverse it. + + In each round: + 1. The environment is initilized with a key and a base program. + 2. The first player inserts operations into the base program to obfoscate the key. + 3. If the new program is valid, the second player annotates the new program ( with obfoscated key ) + 4. The second player creates a new program which outputs the original key given the obfoscated key. + 5. If the second player suceeds, the new program may become the base program to continue the game. + + The roles for this game are: + DefenderObfoscateKey + AttackerAnnotateTarget + AttackerReverseEngineer + + The environment for this game is: + base_program: a program which takes an input and returns an output + + + key: a randomly generated number + obfoscated_key: the result of base_program(key) + + ''' + def __init__(self, roles=None, environment=None, context=None, key=None): + super().__init__() + self.environment = environment if environment is not None else "" + self.roles = roles if roles is not None else self.get_next_roles() + self.context = context if context is not None else {"history": [], "message": None} + self.key = key if key is not None else random.random() + + def initilize(self, environment, context=None ,roles=None, key=None): + return CrackMeGame( + roles=roles, + environment=environment, + context=context, + key=key + ) + + def update_game(self, move): + new_context = self.context.copy() + new_environment = self.environment.copy() + #TODO: verify that self.roles[0] is the current move + + if self.roles[0] == "AttackerAnnotateTarget": + new_context['message'] = None + new_context.get('annotation','').append(f"\n {move}") + + else: + key_in = self.key if self.roles[0] is "DefenderObfoscateKey" else self.context['obfoscated_key'] + success, result = self.execute_and_verify(move, key_in) + if success: + if self.roles[0] == "DefenderObfoscateKey": + new_context['obfoscated_key'] = result + new_context['annotation_target'] = self.environment + new_context['annotation'] = None + new_context['message'] = None + + if self.roles[0] == "AttackerReverseEngineer": + if self.key == result: + #ATTACKER WINS!! reset the game here + # update history with a tuple (defender_code,annotation,attacker_code) + new_context['history'].append((new_context['annotation_target'],new_context['annotation'], self.environment)) + new_context['message'] = None + return self.initialize( + environment=new_environment, + context=new_context + ) + + else: + new_context['message'] = f"result {result} is incorrect. ACCESS DENIED" + else: + new_context['message'] = result # this will return error from verify and execute + + return self.initialize( + roles=self.roles, + environment=new_environment, + context=new_context, + key=self.key #maybe change? + ) + + + def query_game(self): + #TODO: change the message and context for each role so info does not leak + + new_context = self.context.copy() + new_roles = self.get_next_roles() + msg = self.validate_game() + new_context['message'] = msg if msg is not None else f"You will move as {new_roles[0]}" + # add specific + return self.initialize( + environment=self.environment, + context=new_context, + roles=new_roles + ) + + def validate_game(self): + ''' + TODO: + put win conditions here depending on current message etc. + for now just have failure of either exceeding turn # mean end + ''' + return self.context['message'] + + + + + + def execute_and_verify(code_str, key_in, timeout=1): + ''' + this function can be overwritten/overloaded to change/expand the representation(s) + and method(s) of exectuion + ''' + code_str = f"{code_str}" + + def validate_nodes(tree): + allowed_nodes = { + ast.Module, ast.Expr, ast.BinOp, ast.UnaryOp, ast.Num, ast.Load, ast.Add, + ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.Mod, ast.UAdd, ast.USub, ast.Assign, + ast.Name, ast.Constant, ast.For, ast.While, ast.If, ast.Compare, ast.Lt, ast.LtE, + ast.Gt, ast.GtE, ast.Eq, ast.NotEq, ast.In, ast.NotIn, ast.Break, ast.Continue, + ast.Pass + } + + for node in ast.walk(tree): + if type(node) not in allowed_nodes: + raise ValueError(f"Operation {type(node).__name__} is not allowed") + def restricted_exec(code_str, input_value): + + # this may need to be modified... + allowed_builtins = {'__builtins__': {}} + restricted_globals = {'__builtins__': allowed_builtins} + restricted_locals = {'input_value': input_value} + + # wrapper code to capture the output + #TODO: fix this wrapper to work with above restrictions + wrapper_code = f""" + result = None + def wrapper(): + global result = {key_in} + {code_str} + return result + + result = wrapper() + """ + try: + tree = ast.parse(move) + validate_nodes(tree) + except SyntaxError as e: + return False, f"Syntax error at line {e.lineno}, column {e.offset}: {e.text}" + except Exception as e + new_context['message'] = f"Error during parsing: {str(e)}" + + with concurrent.futures.ThreadPoolExecutor() as executor: + + # make sure this works, maybe use ast.compile() + future = executor.submit(restricted_exec, wrapper_code, key_in) + try: + result, error = future.result(timeout=timeout) + if error: + return False, error + else: + return True, result + except concurrent.futures.TimeoutError: + return False, "Timeout occurred" + + except Exception as e + # for now dont leak error info to llm + return False, f"Error during execution" + # maybe use logger here? + + #TODO: impliment export later From bfe73f5ef719d31e05cd75b130b9b031b661f6ac Mon Sep 17 00:00:00 2001 From: c-rit <114767694+c-rit@users.noreply.github.com> Date: Wed, 17 Jul 2024 03:08:15 +0000 Subject: [PATCH 2/6] functional crackme game with python for environment --- zero_sum_eval/games/crackme/crackme_game.py | 235 +++++++++++++------- 1 file changed, 154 insertions(+), 81 deletions(-) diff --git a/zero_sum_eval/games/crackme/crackme_game.py b/zero_sum_eval/games/crackme/crackme_game.py index 1506555..6f46a80 100644 --- a/zero_sum_eval/games/crackme/crackme_game.py +++ b/zero_sum_eval/games/crackme/crackme_game.py @@ -1,13 +1,15 @@ #file: crackme.py -import pyyaml +#TODO: figure out multi line inputs +import yaml import ast import random import concurrent.futures -from game_manager import GameManager -from game_state import GameState - +from zero_sum_eval.game_manager import GameManager +from zero_sum_eval.game_state import GameState +from zero_sum_eval.registry import GAME_REGISTRY +@GAME_REGISTRY.register("crackme") class CrackMeGame(GameState): ''' This is a two player game where one player creates an obfoscation program, @@ -26,158 +28,229 @@ class CrackMeGame(GameState): AttackerReverseEngineer The environment for this game is: - base_program: a program which takes an input and returns an output - - - key: a randomly generated number + code: a program which takes an input and returns an output + + key: a randomly generated number (not stored) obfoscated_key: the result of base_program(key) ''' - def __init__(self, roles=None, environment=None, context=None, key=None): + def __init__(self, roles=None, environment=None, context=None): super().__init__() - self.environment = environment if environment is not None else "" + self.environment = environment if environment is not None else {"code":""} + self.context = context if context is not None else {"annotation": "", "message": None} self.roles = roles if roles is not None else self.get_next_roles() - self.context = context if context is not None else {"history": [], "message": None} - self.key = key if key is not None else random.random() - def initilize(self, environment, context=None ,roles=None, key=None): + def initialize(self, environment=None, context=None ,roles=None): return CrackMeGame( roles=roles, environment=environment, context=context, - key=key ) def update_game(self, move): new_context = self.context.copy() - new_environment = self.environment.copy() - #TODO: verify that self.roles[0] is the current move + new_environment = self.environment.copy() if self.roles[0] == "AttackerAnnotateTarget": - new_context['message'] = None - new_context.get('annotation','').append(f"\n {move}") + new_context['message'] = None + new_context['annotation'] = new_context.get('annotation',"") + f"\n {move}" + else: - key_in = self.key if self.roles[0] is "DefenderObfoscateKey" else self.context['obfoscated_key'] + key_in = random.randint(1000,100000) if self.roles[0] == "DefenderObfoscateKey" else self.context['obfoscated_key'] success, result = self.execute_and_verify(move, key_in) - if success: + if success: if self.roles[0] == "DefenderObfoscateKey": + new_environment['code'] = move new_context['obfoscated_key'] = result - new_context['annotation_target'] = self.environment - new_context['annotation'] = None + # new_environment['annotation_target'] = new_environment['code'] + new_context['annotation'] = '' new_context['message'] = None - + if self.roles[0] == "AttackerReverseEngineer": - if self.key == result: - #ATTACKER WINS!! reset the game here + + # verify by checking if original_code( deobfoscation_code( obfoscated_key ) ) == obfoscated_key + _ , orig_result = self.execute_and_verify(self.environment['code'], result) + # this should be verified but it is possible to have an error if the gamestate is not correct + #TODO: look into ways to make this more efficent... maybe + + if orig_result == new_context['obfoscated_key']: + #ATTACKER WINS!! # update history with a tuple (defender_code,annotation,attacker_code) - new_context['history'].append((new_context['annotation_target'],new_context['annotation'], self.environment)) - new_context['message'] = None + history = new_context.get('history', []) + new_environment['code'] = move + history.append((self.environment['code'],new_context['annotation'],new_environment['code'])) + new_context.update({'history': history}) + + #reset + new_context['obfoscated_key'] = None + new_context['message'] = "Attacker Success" + + # move this to environment incase + #new_environment.update({'status':"Attacker Success"}) return self.initialize( environment=new_environment, context=new_context ) - + else: - new_context['message'] = f"result {result} is incorrect. ACCESS DENIED" - else: - new_context['message'] = result # this will return error from verify and execute + new_context['message'] = f"result {result} is incorrect. ACCESS DENIED" + else: + new_context['message'] = result # this will return error from verify_and_execute return self.initialize( roles=self.roles, environment=new_environment, context=new_context, - key=self.key #maybe change? ) def query_game(self): - #TODO: change the message and context for each role so info does not leak - + instructions = { + "DefenderObfoscateKey": "Implement a code snippit to scramble and obfoscate a given input to prevent reverse engineering.", + "AttackerReverseEngineer": "Reverse the provided code snippit such that reversed_code( code( key ) ) == key == reversed_code( obfoscated_key ) ", + "AttackerAnnotateTarget": "Describe the target code to assist in reverse engineering later" + } + new_context = self.context.copy() new_roles = self.get_next_roles() msg = self.validate_game() - new_context['message'] = msg if msg is not None else f"You will move as {new_roles[0]}" - # add specific + new_context['message'] = msg if msg is not None else f"You will move as {new_roles[0]}.\n GOAL: {instructions[new_roles[0]]}" + + return self.initialize( environment=self.environment, context=new_context, - roles=new_roles + roles=[new_roles[0]], ) def validate_game(self): - ''' - TODO: - put win conditions here depending on current message etc. - for now just have failure of either exceeding turn # mean end ''' + can put win conditions here relating to current environment, if nessiscary + attacker success is the only win case (game can also end in failure) + ''' + #do not need right now +# if self.environment.get('status',None) != None: +# return self.environment['status'] return self.context['message'] + def get_next_roles(self): + #verify these are correct + #if self.context.get('annotation_target',None) == None: + if self.context.get('obfoscated_key',None) == None: + return ["DefenderObfoscateKey","AttackerAnnotateTarget","AttackerReverseEngineer"] + elif self.context.get('annotation',"") != "": + return ["AttackerReverseEngineer","DefenderObfoscateKey","AttackerAnnotateTarget"] + else: + return ["AttackerAnnotateTarget","AttackerReverseEngineer","DefenderObfoscateKey"] - - - def execute_and_verify(code_str, key_in, timeout=1): - ''' - this function can be overwritten/overloaded to change/expand the representation(s) - and method(s) of exectuion - ''' - code_str = f"{code_str}" + def export(self): + return yaml.dump(self.__dict__) + + + def execute_and_verify(self, code_str, input_value, timeout = 1): + + # this function can be overwritten/overloaded to change/expand the representation(s) and method(s) of exectuion + def validate_nodes(tree): allowed_nodes = { ast.Module, ast.Expr, ast.BinOp, ast.UnaryOp, ast.Num, ast.Load, ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.Mod, ast.UAdd, ast.USub, ast.Assign, ast.Name, ast.Constant, ast.For, ast.While, ast.If, ast.Compare, ast.Lt, ast.LtE, ast.Gt, ast.GtE, ast.Eq, ast.NotEq, ast.In, ast.NotIn, ast.Break, ast.Continue, - ast.Pass + ast.Pass, ast.Store, ast.AugAssign, ast.FloorDiv, ast.Call } - for node in ast.walk(tree): if type(node) not in allowed_nodes: raise ValueError(f"Operation {type(node).__name__} is not allowed") + def restricted_exec(code_str, input_value): - # this may need to be modified... - allowed_builtins = {'__builtins__': {}} - restricted_globals = {'__builtins__': allowed_builtins} - restricted_locals = {'input_value': input_value} + # These are the functions that can be called + restricted_globals = { + '__builtins__': { + 'abs': abs, 'all': all, 'any': any, 'bin': bin, 'bool': bool, + 'int': int, 'float': float, 'len': len, 'list': list, 'max': max, + 'min': min, 'pow': pow, 'range': range, 'round': round, 'sum': sum, 'int': int + } + } + restricted_locals = {'input_value': input_value, 'result': input_value} - # wrapper code to capture the output - #TODO: fix this wrapper to work with above restrictions wrapper_code = f""" - result = None - def wrapper(): - global result = {key_in} - {code_str} - return result +def wrapper(input): + res=input + {code_str} + return res - result = wrapper() +input_hidden = {input_value} +result = wrapper(input_hidden) """ + exec(wrapper_code, restricted_globals, restricted_locals) + return restricted_locals['result'] + try: - tree = ast.parse(move) + if not isinstance(code_str, (str, bytes, ast.AST)): + raise TypeError(f"code_str must be a string, bytes or AST object, not {type(code_str)}") + + tree = ast.parse(code_str) validate_nodes(tree) except SyntaxError as e: - return False, f"Syntax error at line {e.lineno}, column {e.offset}: {e.text}" - except Exception as e - new_context['message'] = f"Error during parsing: {str(e)}" - - with concurrent.futures.ThreadPoolExecutor() as executor: + return False, f"Syntax error at line {e.lineno}, column {e.offset}: {e.text}" + except Exception as e: + return False, f"Error during parsing: {str(e)}" - # make sure this works, maybe use ast.compile() - future = executor.submit(restricted_exec, wrapper_code, key_in) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(restricted_exec, code_str, input_value) try: - result, error = future.result(timeout=timeout) - if error: - return False, error - else: - return True, result + result = future.result(timeout=timeout) + return True, result except concurrent.futures.TimeoutError: return False, "Timeout occurred" + except Exception as e: + # for now dont leak error details to llm + return False, f"Error during execution: {type(e).__name__}" + +if __name__ == "__main__": + +''' +example with human player: + for defender input: res=input**2 + some correct inputs for attacker: + res=int(input**1/2) + res=(input**(1/2) - ((input**(1/2)) % 1)) // 1 +''' + from zero_sum_eval.player import HumanPlayer, Player + config = { + "game": { + "name": "crackme_challenge", + "args": { + "win_conditions": ["Attacker Success"], + "max_rounds": 10, + "players": [{"id":"defender", "role":"DefenderObfoscateKey"}, + {"id":"attacker", "role":"AttackerAnnotateTarget"}, + {"id":"attacker", "role":"AttackerReverseEngineer"}], + + "challenges": [ + { + "environment": None + }, + ], + }, + } + } + + game_manager = GameManager(config) + for player_config in config["game"]["args"]["players"]: + player = HumanPlayer(**player_config) + + player.max_tries = 2 + game_manager.register_player(player) + + game_state = CrackMeGame().initialize(None) + result = game_manager.do_eval(game_state) + + print(result.query_game().export()) - except Exception as e - # for now dont leak error info to llm - return False, f"Error during execution" - # maybe use logger here? - #TODO: impliment export later From c7fa2bc4a45037a3ee97590e698c547ce9bb3b03 Mon Sep 17 00:00:00 2001 From: CR <114767694+c-rit@users.noreply.github.com> Date: Thu, 18 Jul 2024 15:32:00 -0400 Subject: [PATCH 3/6] fixed spelling errors --- zero_sum_eval/games/crackme/crackme_game.py | 42 ++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/zero_sum_eval/games/crackme/crackme_game.py b/zero_sum_eval/games/crackme/crackme_game.py index 6f46a80..c6527a4 100644 --- a/zero_sum_eval/games/crackme/crackme_game.py +++ b/zero_sum_eval/games/crackme/crackme_game.py @@ -12,18 +12,18 @@ @GAME_REGISTRY.register("crackme") class CrackMeGame(GameState): ''' - This is a two player game where one player creates an obfoscation program, + This is a two player game where one player creates an obfuscation program, and another attempts to reverse it. In each round: - 1. The environment is initilized with a key and a base program. - 2. The first player inserts operations into the base program to obfoscate the key. - 3. If the new program is valid, the second player annotates the new program ( with obfoscated key ) - 4. The second player creates a new program which outputs the original key given the obfoscated key. - 5. If the second player suceeds, the new program may become the base program to continue the game. + 1. The environment is initialized with a key and a base program. + 2. The first player inserts operations into the base program to obfuscate the key. + 3. If the new program is valid, the second player annotates the new program ( with obfuscated key ) + 4. The second player creates a new program which outputs the original key given the obfuscated key. + 5. If the second player succeeds, the new program may become the base program to continue the game. The roles for this game are: - DefenderObfoscateKey + DefenderobfuscateKey AttackerAnnotateTarget AttackerReverseEngineer @@ -31,7 +31,7 @@ class CrackMeGame(GameState): code: a program which takes an input and returns an output key: a randomly generated number (not stored) - obfoscated_key: the result of base_program(key) + obfuscated_key: the result of base_program(key) ''' def __init__(self, roles=None, environment=None, context=None): @@ -57,24 +57,24 @@ def update_game(self, move): else: - key_in = random.randint(1000,100000) if self.roles[0] == "DefenderObfoscateKey" else self.context['obfoscated_key'] + key_in = random.randint(1000,100000) if self.roles[0] == "DefenderobfuscateKey" else self.context['obfuscated_key'] success, result = self.execute_and_verify(move, key_in) if success: - if self.roles[0] == "DefenderObfoscateKey": + if self.roles[0] == "DefenderobfuscateKey": new_environment['code'] = move - new_context['obfoscated_key'] = result + new_context['obfuscated_key'] = result # new_environment['annotation_target'] = new_environment['code'] new_context['annotation'] = '' new_context['message'] = None if self.roles[0] == "AttackerReverseEngineer": - # verify by checking if original_code( deobfoscation_code( obfoscated_key ) ) == obfoscated_key + # verify by checking if original_code( deobfuscation_code( obfuscated_key ) ) == obfuscated_key _ , orig_result = self.execute_and_verify(self.environment['code'], result) # this should be verified but it is possible to have an error if the gamestate is not correct #TODO: look into ways to make this more efficent... maybe - if orig_result == new_context['obfoscated_key']: + if orig_result == new_context['obfuscated_key']: #ATTACKER WINS!! # update history with a tuple (defender_code,annotation,attacker_code) history = new_context.get('history', []) @@ -83,7 +83,7 @@ def update_game(self, move): new_context.update({'history': history}) #reset - new_context['obfoscated_key'] = None + new_context['obfuscated_key'] = None new_context['message'] = "Attacker Success" # move this to environment incase @@ -107,8 +107,8 @@ def update_game(self, move): def query_game(self): instructions = { - "DefenderObfoscateKey": "Implement a code snippit to scramble and obfoscate a given input to prevent reverse engineering.", - "AttackerReverseEngineer": "Reverse the provided code snippit such that reversed_code( code( key ) ) == key == reversed_code( obfoscated_key ) ", + "DefenderobfuscateKey": "Implement a code snippit to scramble and obfuscate a given input to prevent reverse engineering.", + "AttackerReverseEngineer": "Reverse the provided code snippit such that reversed_code( code( key ) ) == key == reversed_code( obfuscated_key ) ", "AttackerAnnotateTarget": "Describe the target code to assist in reverse engineering later" } @@ -138,12 +138,12 @@ def validate_game(self): def get_next_roles(self): #verify these are correct #if self.context.get('annotation_target',None) == None: - if self.context.get('obfoscated_key',None) == None: - return ["DefenderObfoscateKey","AttackerAnnotateTarget","AttackerReverseEngineer"] + if self.context.get('obfuscated_key',None) == None: + return ["DefenderobfuscateKey","AttackerAnnotateTarget","AttackerReverseEngineer"] elif self.context.get('annotation',"") != "": - return ["AttackerReverseEngineer","DefenderObfoscateKey","AttackerAnnotateTarget"] + return ["AttackerReverseEngineer","DefenderobfuscateKey","AttackerAnnotateTarget"] else: - return ["AttackerAnnotateTarget","AttackerReverseEngineer","DefenderObfoscateKey"] + return ["AttackerAnnotateTarget","AttackerReverseEngineer","DefenderobfuscateKey"] def export(self): @@ -228,7 +228,7 @@ def wrapper(input): "args": { "win_conditions": ["Attacker Success"], "max_rounds": 10, - "players": [{"id":"defender", "role":"DefenderObfoscateKey"}, + "players": [{"id":"defender", "role":"DefenderobfuscateKey"}, {"id":"attacker", "role":"AttackerAnnotateTarget"}, {"id":"attacker", "role":"AttackerReverseEngineer"}], From e5c31afef56be75649f69dd21b4eb50488406dd2 Mon Sep 17 00:00:00 2001 From: c-rit <114767694+c-rit@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:52:07 +0000 Subject: [PATCH 4/6] very basic llm player for crackme --- zero_sum_eval/games/crackme/crackme_player.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 zero_sum_eval/games/crackme/crackme_player.py diff --git a/zero_sum_eval/games/crackme/crackme_player.py b/zero_sum_eval/games/crackme/crackme_player.py new file mode 100644 index 0000000..1b6cf4d --- /dev/null +++ b/zero_sum_eval/games/crackme/crackme_player.py @@ -0,0 +1,55 @@ +from zero_sum_eval.player import Player +import dspy +from zero_sum_eval.registry import PLAYER_REGISTRY, LM_REGISTRY + + + +class NextMove(dspy.Signature): + """Given a code snippit, role, and context, produce a code snippit or annotation""" + + code = dspy.InputField(desc="The current code") + role = dspy.InputField(desc="role of the player making the next move") + context = dspy.InputField(desc="further context for the player") + move = dspy.OutputField(desc="a valid base python expression or comment") + + +class CrackMeCoT(dspy.Module): + + def __init__(self): + super().__init__() + self.prog = dspy.Predict(NextMove) + + def forward(self, code, role, context): + return self.prog(code=code,role=role,context=context) + + + +@PLAYER_REGISTRY.register("crackme", "crackme_player") +class CrackMePlayer(Player): + def __init__(self, lm, max_tries=4, **kwargs): + super().__init__(**kwargs) + lm_args = lm["args"] if "args" in lm else {} + self.llm_model = LM_REGISTRY.build(lm["type"], **lm_args) + self.max_tries = max_tries + self.module = CrackMeCoT() + + def make_move(self, game_state): + """ + Abstract method for making a move based on the current game state. + + Parameters: + game_state (GameState): The current state of the game + + Returns: + str: The move made by the player + """ + + code = {game_state.environment.get('code','') + role = f"{game_state.roles}" + context = f"{game_state.context}" + + with dspy.context(lm=self.llm_model): + trace = self.module(code, role, context) + return trace.move + + From 39a11e9722bbd2759969f12367ea1142dd8fc4b5 Mon Sep 17 00:00:00 2001 From: Haidar Khan Date: Sun, 28 Jul 2024 15:18:14 -0400 Subject: [PATCH 5/6] updating crackme for latest changes --- configs/crackme.yaml | 70 +++++++++++++++++++ zero_sum_eval/games/crackme/crackme_game.py | 15 ++-- zero_sum_eval/games/crackme/crackme_player.py | 16 ++--- 3 files changed, 83 insertions(+), 18 deletions(-) create mode 100644 configs/crackme.yaml diff --git a/configs/crackme.yaml b/configs/crackme.yaml new file mode 100644 index 0000000..dccdfd1 --- /dev/null +++ b/configs/crackme.yaml @@ -0,0 +1,70 @@ +manager: + args: + max_rounds: 200 + win_conditions: + - Checkmate +game: + name: chess + players: + - name: chess_player + args: + id: gpt4 white + roles: + - White + optimize: false + dataset: chess_dataset + dataset_args: + filename: ./data/chess/stockfish_examples.jsonl + role: White + optimizer: MIPROv2 + optimizer_args: + num_candidates: 5 + minibatch_size: 20 + minibatch_full_eval_steps: 10 + compilation_args: + max_bootstrapped_demos: 1 + max_labeled_demos: 1 + metric: chess_move_validation_metric + lm: + type: AzureOpenAI + args: + api_base: https://allam-swn-gpt-01.openai.azure.com/ + api_version: 2023-07-01-preview + deployment_id: gpt-4o-900ptu + max_tokens: 800 + temperature: 0.8 + top_p: 0.95 + frequency_penalty: 0 + presence_penalty: 0 + max_tries: 30 + - name: chess_player + args: + id: gpt4 black + roles: + - Black + optimize: false + dataset: chess_dataset + dataset_args: + filename: ./data/chess/stockfish_examples.jsonl + role: Black + optimizer: MIPROv2 + optimizer_args: + num_candidates: 5 + minibatch_size: 20 + minibatch_full_eval_steps: 10 + compilation_args: + max_bootstrapped_demos: 1 + max_labeled_demos: 1 + metric: chess_move_validation_metric + lm: + type: AzureOpenAI + args: + api_base: https://allam-swn-gpt-01.openai.azure.com/ + api_version: 2023-07-01-preview + deployment_id: gpt-4o-900ptu + max_tokens: 800 + temperature: 0.8 + top_p: 0.95 + frequency_penalty: 0 + presence_penalty: 0 + max_tries: 30 diff --git a/zero_sum_eval/games/crackme/crackme_game.py b/zero_sum_eval/games/crackme/crackme_game.py index c6527a4..a75809c 100644 --- a/zero_sum_eval/games/crackme/crackme_game.py +++ b/zero_sum_eval/games/crackme/crackme_game.py @@ -213,14 +213,13 @@ def wrapper(input): return False, f"Error during execution: {type(e).__name__}" if __name__ == "__main__": - -''' -example with human player: - for defender input: res=input**2 - some correct inputs for attacker: - res=int(input**1/2) - res=(input**(1/2) - ((input**(1/2)) % 1)) // 1 -''' + ''' + example with human player: + for defender input: res=input**2 + some correct inputs for attacker: + res=int(input**1/2) + res=(input**(1/2) - ((input**(1/2)) % 1)) // 1 + ''' from zero_sum_eval.player import HumanPlayer, Player config = { "game": { diff --git a/zero_sum_eval/games/crackme/crackme_player.py b/zero_sum_eval/games/crackme/crackme_player.py index 1b6cf4d..94e5442 100644 --- a/zero_sum_eval/games/crackme/crackme_player.py +++ b/zero_sum_eval/games/crackme/crackme_player.py @@ -1,7 +1,6 @@ from zero_sum_eval.player import Player import dspy -from zero_sum_eval.registry import PLAYER_REGISTRY, LM_REGISTRY - +from zero_sum_eval.registry import PLAYER_REGISTRY class NextMove(dspy.Signature): @@ -26,12 +25,9 @@ def forward(self, code, role, context): @PLAYER_REGISTRY.register("crackme", "crackme_player") class CrackMePlayer(Player): - def __init__(self, lm, max_tries=4, **kwargs): - super().__init__(**kwargs) - lm_args = lm["args"] if "args" in lm else {} - self.llm_model = LM_REGISTRY.build(lm["type"], **lm_args) - self.max_tries = max_tries - self.module = CrackMeCoT() + def _build_modules(self, **module_args): + self.main_module = CrackMeCoT() + return [self.main_module] def make_move(self, game_state): """ @@ -44,12 +40,12 @@ def make_move(self, game_state): str: The move made by the player """ - code = {game_state.environment.get('code','') + code = game_state.environment.get('code','') role = f"{game_state.roles}" context = f"{game_state.context}" with dspy.context(lm=self.llm_model): - trace = self.module(code, role, context) + trace = self.main_module(code, role, context) return trace.move From 5955e646a811e5bf9d0ad99180c062943bde8d06 Mon Sep 17 00:00:00 2001 From: Haidar Khan Date: Sun, 28 Jul 2024 15:52:57 -0400 Subject: [PATCH 6/6] this runs, whether its correct or not needs to be verified --- configs/crackme.yaml | 47 +++---------- zero_sum_eval/games/crackme/crackme_game.py | 76 +++++++++++---------- 2 files changed, 49 insertions(+), 74 deletions(-) diff --git a/configs/crackme.yaml b/configs/crackme.yaml index dccdfd1..24c99ba 100644 --- a/configs/crackme.yaml +++ b/configs/crackme.yaml @@ -1,30 +1,16 @@ manager: args: - max_rounds: 200 + max_rounds: 4 win_conditions: - - Checkmate + - Attacker Success game: - name: chess + name: crackme players: - - name: chess_player + - name: crackme_player args: - id: gpt4 white + id: gpt41 roles: - - White - optimize: false - dataset: chess_dataset - dataset_args: - filename: ./data/chess/stockfish_examples.jsonl - role: White - optimizer: MIPROv2 - optimizer_args: - num_candidates: 5 - minibatch_size: 20 - minibatch_full_eval_steps: 10 - compilation_args: - max_bootstrapped_demos: 1 - max_labeled_demos: 1 - metric: chess_move_validation_metric + - DefenderobfuscateKey lm: type: AzureOpenAI args: @@ -37,25 +23,12 @@ game: frequency_penalty: 0 presence_penalty: 0 max_tries: 30 - - name: chess_player + - name: crackme_player args: - id: gpt4 black + id: gpt42 roles: - - Black - optimize: false - dataset: chess_dataset - dataset_args: - filename: ./data/chess/stockfish_examples.jsonl - role: Black - optimizer: MIPROv2 - optimizer_args: - num_candidates: 5 - minibatch_size: 20 - minibatch_full_eval_steps: 10 - compilation_args: - max_bootstrapped_demos: 1 - max_labeled_demos: 1 - metric: chess_move_validation_metric + - AttackerAnnotateTarget + - AttackerReverseEngineer lm: type: AzureOpenAI args: diff --git a/zero_sum_eval/games/crackme/crackme_game.py b/zero_sum_eval/games/crackme/crackme_game.py index a75809c..9c544b0 100644 --- a/zero_sum_eval/games/crackme/crackme_game.py +++ b/zero_sum_eval/games/crackme/crackme_game.py @@ -149,7 +149,9 @@ def get_next_roles(self): def export(self): return yaml.dump(self.__dict__) - + def display(self): + return yaml.dump(self.environment) + def execute_and_verify(self, code_str, input_value, timeout = 1): # this function can be overwritten/overloaded to change/expand the representation(s) and method(s) of exectuion @@ -212,44 +214,44 @@ def wrapper(input): # for now dont leak error details to llm return False, f"Error during execution: {type(e).__name__}" -if __name__ == "__main__": - ''' - example with human player: - for defender input: res=input**2 - some correct inputs for attacker: - res=int(input**1/2) - res=(input**(1/2) - ((input**(1/2)) % 1)) // 1 - ''' - from zero_sum_eval.player import HumanPlayer, Player - config = { - "game": { - "name": "crackme_challenge", - "args": { - "win_conditions": ["Attacker Success"], - "max_rounds": 10, - "players": [{"id":"defender", "role":"DefenderobfuscateKey"}, - {"id":"attacker", "role":"AttackerAnnotateTarget"}, - {"id":"attacker", "role":"AttackerReverseEngineer"}], - - "challenges": [ - { - "environment": None - }, - ], - }, - } - } - - game_manager = GameManager(config) - for player_config in config["game"]["args"]["players"]: - player = HumanPlayer(**player_config) +# if __name__ == "__main__": +# ''' +# example with human player: +# for defender input: res=input**2 +# some correct inputs for attacker: +# res=int(input**1/2) +# res=(input**(1/2) - ((input**(1/2)) % 1)) // 1 +# ''' +# from zero_sum_eval.player import HumanPlayer, Player +# config = { +# "game": { +# "name": "crackme_challenge", +# "args": { +# "win_conditions": ["Attacker Success"], +# "max_rounds": 10, +# "players": [{"id":"defender", "role":"DefenderobfuscateKey"}, +# {"id":"attacker", "role":"AttackerAnnotateTarget"}, +# {"id":"attacker", "role":"AttackerReverseEngineer"}], + +# "challenges": [ +# { +# "environment": None +# }, +# ], +# }, +# } +# } + +# game_manager = GameManager(config) +# for player_config in config["game"]["args"]["players"]: +# player = HumanPlayer(**player_config) - player.max_tries = 2 - game_manager.register_player(player) +# player.max_tries = 2 +# game_manager.register_player(player) - game_state = CrackMeGame().initialize(None) - result = game_manager.do_eval(game_state) +# game_state = CrackMeGame().initialize(None) +# result = game_manager.do_eval(game_state) - print(result.query_game().export()) +# print(result.query_game().export())