Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blind oracle protocol v2 aes with ecdh #26

Merged
merged 9 commits into from
Jan 10, 2024
39 changes: 34 additions & 5 deletions client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .lib import E_ECDH, decrypt, encrypt
from hmac import compare_digest
from wallycore import ec_sig_verify, sha256, hmac_sha256, EC_FLAG_ECDSA
from wallycore import ec_sig_verify, sha256, hmac_sha256, EC_FLAG_ECDSA, \
ec_public_key_bip341_tweak


class PINClientECDH(E_ECDH):
Expand All @@ -10,6 +11,14 @@ def __init__(self, static_server_public_key):
self.static_server_public_key = static_server_public_key
self.ecdh_server_public_key = None

# returns ske, cke
def get_key_exchange(self):
return self.ecdh_server_public_key, self.public_key


# NOTE: protocol v1:
# Explicit 'hmac' fields, separate derived keys, and key-exchange handshake
class PINClientECDHv1(PINClientECDH):
def handshake(self, e_ecdh_server_public_key, static_server_signature):
ec_sig_verify(
self.static_server_public_key,
Expand All @@ -23,10 +32,6 @@ def handshake(self, e_ecdh_server_public_key, static_server_signature):
# Cache the shared secrets
self.generate_shared_secrets(e_ecdh_server_public_key)

# returns ske, cke
def get_key_exchange(self):
return self.ecdh_server_public_key, self.public_key

# Encrypt/sign/hmac the payload (ie. the pin secret)
def encrypt_request_payload(self, payload):
assert self.ecdh_server_public_key
Expand All @@ -45,3 +50,27 @@ def decrypt_response_payload(self, encrypted, hmac):

# Return decrypted data
return decrypt(self.response_encryption_key, encrypted)


# NOTE: protocol v2:
# 'hmac' fields and derived keys implicit, and no key-exchange handshake required
class PINClientECDHv2(PINClientECDH):

def __init__(self, static_server_public_key, replay_counter):
super().__init__(static_server_public_key)

assert len(replay_counter) == 4
self.replay_counter = replay_counter

# Derive and store the ecdh server public key (ske)
tweak = sha256(hmac_sha256(self.public_key, self.replay_counter))
self.ecdh_server_public_key = ec_public_key_bip341_tweak(
self.static_server_public_key, tweak, 0)

def encrypt_request_payload(self, payload):
return self.encrypt_with_ecdh(self.ecdh_server_public_key, self.LABEL_ORACLE_REQUEST,
payload)

def decrypt_response_payload(self, encrypted):
return self.decrypt_with_ecdh(self.ecdh_server_public_key, self.LABEL_ORACLE_RESPONSE,
encrypted)
92 changes: 59 additions & 33 deletions flaskserver.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import time
import json
import os
import json
import base64
import time
from flask import Flask, request, jsonify
from .server import PINServerECDH
from .server import PINServerECDH, PINServerECDHv1, PINServerECDHv2
from .pindb import PINDb
from wallycore import hex_from_bytes, hex_to_bytes, AES_KEY_LEN_256, \
AES_BLOCK_LEN
from wallycore import AES_KEY_LEN_256, AES_BLOCK_LEN, HMAC_SHA256_LEN
from dotenv import load_dotenv

b2h = hex_from_bytes
h2b = hex_to_bytes

# Time we will retain active sessions, in seconds.
# ie. maximum time allowed 'start_handshake' (which creates the session)
# and the get-/set-pin call, which utilises it.
Expand Down Expand Up @@ -43,45 +40,74 @@ def start_handshake_route():
app.logger.debug('Number of sessions {}'.format(len(sessions)))

# Create a new ephemeral server/session and get its signed pubkey
e_ecdh_server = PINServerECDH()
e_ecdh_server = PINServerECDHv1()
pubkey, sig = e_ecdh_server.get_signed_public_key()
ske = b2h(pubkey)
ske = pubkey.hex()

# Cache new session
_cleanup_expired_sessions()
sessions[ske] = e_ecdh_server

# Return response
return jsonify({'ske': ske,
'sig': b2h(sig)})
'sig': sig.hex()})

def _complete_server_call(pin_func):
try:
# Get request data
udata = json.loads(request.data)
ske = udata['ske']
# NOTE: explicit fields in protocol v1
def _complete_server_call_v1(pin_func, udata):
ske = udata['ske']
assert 'replay_counter' not in udata

# Get associated session (ensuring not stale)
_cleanup_expired_sessions()
e_ecdh_server = sessions[ske]
# Get associated session (ensuring not stale)
_cleanup_expired_sessions()
e_ecdh_server = sessions[ske]

# get/set pin and get response data
encrypted_key, hmac = e_ecdh_server.call_with_payload(
h2b(udata['cke']),
h2b(udata['encrypted_data']),
h2b(udata['hmac_encrypted_data']),
pin_func)
# get/set pin and get response data
encrypted_key, hmac = e_ecdh_server.call_with_payload(
bytes.fromhex(udata['cke']),
bytes.fromhex(udata['encrypted_data']),
bytes.fromhex(udata['hmac_encrypted_data']),
pin_func)

# Expecting to return an encrypted aes-key
assert len(encrypted_key) == AES_KEY_LEN_256 + (2*AES_BLOCK_LEN)
# Expecting to return an encrypted aes-key with separate hmac
assert len(encrypted_key) == AES_KEY_LEN_256 + (2*AES_BLOCK_LEN)
assert len(hmac) == HMAC_SHA256_LEN

# Cleanup session
del sessions[ske]
_cleanup_expired_sessions()
# Cleanup session
del sessions[ske]
_cleanup_expired_sessions()

# Return response
return jsonify({'encrypted_key': encrypted_key.hex(),
'hmac': hmac.hex()})

# NOTE: v2 is one concatentated field, ascii85-encoded
def _complete_server_call_v2(pin_func, udata):
assert 'data' in udata
data = base64.a85decode(udata['data'].encode())
assert len(data) > 37 # cke and counter and some encrypted payload

cke = data[:33]
replay_counter = data[33:37]
encrypted_data = data[37:]
e_ecdh_server = PINServerECDHv2(replay_counter, cke)
encrypted_key = e_ecdh_server.call_with_payload(
cke,
encrypted_data,
pin_func)

# Expecting to return an encrypted aes-key with hmac appended
assert len(encrypted_key) == AES_KEY_LEN_256 + (2*AES_BLOCK_LEN) + HMAC_SHA256_LEN

# Return response
return jsonify({'data': base64.a85encode(encrypted_key).decode()})

# Return response
return jsonify({'encrypted_key': b2h(encrypted_key),
'hmac': b2h(hmac)})
def _complete_server_call(pin_func):
try:
# Get request data
udata = json.loads(request.data)
if 'data' in udata:
return _complete_server_call_v2(pin_func, udata)
return _complete_server_call_v1(pin_func, udata)

except Exception as e:
app.logger.error("Error: {} {}".format(type(e), e))
Expand Down
15 changes: 14 additions & 1 deletion lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from wallycore import AES_BLOCK_LEN, AES_FLAG_DECRYPT, AES_FLAG_ENCRYPT, \
aes_cbc, ec_private_key_verify, ec_public_key_from_private_key, ecdh, \
hmac_sha256
hmac_sha256, aes_cbc_with_ecdh_key


def encrypt(aes_key, plaintext):
Expand All @@ -19,6 +19,10 @@ def decrypt(aes_key, encrypted):

class E_ECDH(object):

# Labels used to derived child keys for aes_cbc_with_ecdh_key() call
LABEL_ORACLE_REQUEST = 'blind_oracle_request'.encode()
LABEL_ORACLE_RESPONSE = 'blind_oracle_response'.encode()

@classmethod
def _generate_private_key(cls):
counter = 4
Expand Down Expand Up @@ -50,3 +54,12 @@ def _derived(val):
self.request_hmac_key = _derived(1)
self.response_encryption_key = _derived(2)
self.response_hmac_key = _derived(3)

def decrypt_with_ecdh(self, public_key, label, encrypted):
return aes_cbc_with_ecdh_key(self.private_key, None, encrypted, public_key, label,
AES_FLAG_DECRYPT)

def encrypt_with_ecdh(self, public_key, label, plaintext):
iv = os.urandom(AES_BLOCK_LEN)
return aes_cbc_with_ecdh_key(self.private_key, iv, plaintext, public_key, label,
AES_FLAG_ENCRYPT)
Loading
Loading