Skip to content

Commit

Permalink
Merge pull request #39 from cloudblue/LITE-25143_improve_logging_1
Browse files Browse the repository at this point in the history
LITE-25143 fix obfuscation for dict and lists
  • Loading branch information
ffaraone authored Oct 17, 2022
2 parents 42a5251 + b269ef5 commit 2d7590c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 49 deletions.
7 changes: 5 additions & 2 deletions connect/eaas/core/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
class BaseModel(PydanticBaseModel):

def get_sensitive_fields(self):
return []
return [] # pragma: no cover

def __obfuscate_args__(self, k, v):
if isinstance(v, str) and v and k in self.get_sensitive_fields():
return k, f'{v[0:2]}******{v[-2:]}'

if isinstance(v, (list, dict)) and v:
if isinstance(v, (list, dict)) and v and k in self.get_sensitive_fields():
return k, '******'

return k, v
Expand Down Expand Up @@ -110,6 +110,9 @@ class SetupResponse(BaseModel):
event_definitions: Optional[List[EventDefinition]]
model_type: Literal['setup_response'] = 'setup_response'

def get_sensitive_fields(self):
return ['variables']


class Schedulable(BaseModel):
method: str
Expand Down
61 changes: 14 additions & 47 deletions tests/connect/eaas/core/test_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,52 +452,30 @@ def test_obfuscate_task_options():
task_category='task_category',
api_key='This is my API key',
)

fields = {}
representation = str(task_options).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['api_key'] == 'Th******ey'
assert next(filter(
lambda x: x[0] == 'api_key', task_options.__repr_args__(),
))[1] == 'Th******ey'


def test_obfuscate_logging():
logging = Logging(
logging_api_key='This is my API key',
)

fields = {}
representation = str(logging).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['logging_api_key'] == 'Th******ey'
assert next(filter(
lambda x: x[0] == 'logging_api_key', logging.__repr_args__(),
))[1] == 'Th******ey'


def test_obfuscate_setup_response():
setup_response = SetupResponse(variables={'VAR1': 'VAL1'})

fields = {}
representation = str(setup_response).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['variables'] == '******'
assert next(filter(
lambda x: x[0] == 'variables', setup_response.__repr_args__(),
))[1] == '******'


def test_obfuscate_setup_request():
setup_request = SetupRequest(variables=[{'VAR1': 'VAL1'}])

fields = {}
representation = str(setup_request).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['variables'] == '******'
assert next(filter(lambda x: x[0] == 'variables', setup_request.__repr_args__()))[1] == '******'


def test_obfuscate_http_request():
Expand All @@ -506,14 +484,7 @@ def test_obfuscate_http_request():
url='url',
headers={'Authorization': 'My Api Key'},
)

fields = {}
representation = str(http_request).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['headers'] == '******'
assert next(filter(lambda x: x[0] == 'headers', http_request.__repr_args__()))[1] == '******'


def test_obfuscate_webtask_options():
Expand All @@ -522,10 +493,6 @@ def test_obfuscate_webtask_options():
reply_to='reply_to',
api_key='This is my API key',
)
fields = {}
representation = str(webtask_options).split(' ')
for field in representation:
k, v = field.split('=')
fields[k] = v[1:-1] if isinstance(v, str) else v

assert fields['api_key'] == 'Th******ey'
assert next(filter(
lambda x: x[0] == 'api_key', webtask_options.__repr_args__(),
))[1] == 'Th******ey'

0 comments on commit 2d7590c

Please sign in to comment.