diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0a3fe95a442a..6dd14cce49a2 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1153,6 +1153,19 @@ def build_passwords(self, job, **kwargs): if value not in ('', 'ASK'): passwords[field] = value + ''' + Only 1 value can be provided for a unique prompt string. Prefer ssh + key unlock over network key unlock. + ''' + if 'ssh_key_unlock' not in passwords: + for cred in job.network_credentials: + if cred.inputs.get('ssh_key_unlock'): + passwords['ssh_key_unlock'] = kwargs.get( + 'ssh_key_unlock', + decrypt_field(cred, 'ssh_key_unlock') + ) + break + return passwords def build_env(self, job, **kwargs): diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 830c3577fd68..91e3c3505d05 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -772,6 +772,65 @@ def test_ssh_passwords(self, field, password_name, expected_flag): if expected_flag: assert expected_flag in ' '.join(args) + def test_net_ssh_key_unlock(self): + net = CredentialType.defaults['net']() + credential = Credential( + pk=1, + credential_type=net, + inputs = {'ssh_key_unlock': 'secret'} + ) + credential.inputs['ssh_key_unlock'] = encrypt_field(credential, 'ssh_key_unlock') + self.instance.credentials.add(credential) + self.task.run(self.pk) + + assert self.run_pexpect.call_count == 1 + call_args, call_kwargs = self.run_pexpect.call_args_list[0] + + assert 'secret' in call_kwargs.get('expect_passwords').values() + + def test_net_first_ssh_key_unlock_wins(self): + for i in range(3): + net = CredentialType.defaults['net']() + credential = Credential( + pk=i, + credential_type=net, + inputs = {'ssh_key_unlock': 'secret{}'.format(i)} + ) + credential.inputs['ssh_key_unlock'] = encrypt_field(credential, 'ssh_key_unlock') + self.instance.credentials.add(credential) + self.task.run(self.pk) + + assert self.run_pexpect.call_count == 1 + call_args, call_kwargs = self.run_pexpect.call_args_list[0] + + assert 'secret0' in call_kwargs.get('expect_passwords').values() + + def test_prefer_ssh_over_net_ssh_key_unlock(self): + net = CredentialType.defaults['net']() + net_credential = Credential( + pk=1, + credential_type=net, + inputs = {'ssh_key_unlock': 'net_secret'} + ) + net_credential.inputs['ssh_key_unlock'] = encrypt_field(net_credential, 'ssh_key_unlock') + + ssh = CredentialType.defaults['ssh']() + ssh_credential = Credential( + pk=2, + credential_type=ssh, + inputs = {'ssh_key_unlock': 'ssh_secret'} + ) + ssh_credential.inputs['ssh_key_unlock'] = encrypt_field(ssh_credential, 'ssh_key_unlock') + + self.instance.credentials.add(net_credential) + self.instance.credentials.add(ssh_credential) + self.task.run(self.pk) + + assert self.run_pexpect.call_count == 1 + call_args, call_kwargs = self.run_pexpect.call_args_list[0] + + assert 'ssh_secret' in call_kwargs.get('expect_passwords').values() + def test_vault_password(self): vault = CredentialType.defaults['vault']() credential = Credential(