diff --git a/config.sample.json b/config.sample.json index f677aae..61303d9 100644 --- a/config.sample.json +++ b/config.sample.json @@ -1,5 +1,11 @@ { "delimiter": "\t", "quotechar": "'", - "destination_path": "" + "destination_path": "", + "sftp_host": "example.com", + "sftp_username": "username", + "sftp_password": "password", + "sftp_port": "22", + "sftp_public_key": "", + "sftp_public_key_format": "ssh-ed25519" } diff --git a/setup.py b/setup.py index 3652235..693b25a 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ classifiers=['Programming Language :: Python :: 3 :: Only'], py_modules=['target_csv'], install_requires=[ + 'paramiko==2.7.2', 'jsonschema==2.6.0', 'singer-python>=5.1.0,<=5.3.1', ], diff --git a/target_csv.py b/target_csv.py index 71a38a9..be7325d 100755 --- a/target_csv.py +++ b/target_csv.py @@ -12,7 +12,8 @@ from datetime import datetime import collections import pkg_resources - +import paramiko +import base64 from jsonschema.validators import Draft4Validator import singer @@ -38,9 +39,13 @@ def flatten(d, parent_key='', sep='__'): return dict(items) -def persist_messages(delimiter, quotechar, messages, destination_path, fixed_headers): +def persist_messages(delimiter, quotechar, messages, destination_path, + fixed_headers, sftp_host, sftp_username, sftp_password, + sftp_port, sftp_public_key, sftp_public_key_format): + state = None schemas = {} + stream_2_filenames = {} key_properties = {} headers = {} validators = {} @@ -63,6 +68,7 @@ def persist_messages(delimiter, quotechar, messages, destination_path, fixed_hea filename = o['stream'] + '-' + now + '.csv' filename = os.path.expanduser(os.path.join(destination_path, filename)) + stream_2_filenames[o['stream']] = filename file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0 # flattened_record = flatten(o['record']) @@ -105,7 +111,27 @@ def persist_messages(delimiter, quotechar, messages, destination_path, fixed_hea else: logger.warning("Unknown message type {} in message {}" .format(o['type'], o)) - + if(sftp_host and sftp_password and sftp_username and sftp_public_key and sftp_public_key_format): + sftp = None + client = None + try: + #key = paramiko.RSAKey(data=base64.b64decode(sftp_public_key)) + client = paramiko.SSHClient() + #client.get_host_keys().add(sftp_host, sftp_public_key_format, key) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #NOT SECURE! + client.connect(hostname=sftp_host,username=sftp_username,password=sftp_password) + sftp = client.open_sftp() + assert len(stream_2_filenames.values())>=1 + for filename in stream_2_filenames.values(): + sftp.put(filename,filename) + logger.info(f"File Name: {filename}. pushed to SFTP Site") + except Exception as e: + if (sftp != None): sftp.close() + if (client != None): client.close() + raise e + else: + sftp.close() + client.close() return state @@ -150,7 +176,14 @@ def main(): config.get('quotechar', '"'), input_messages, config.get('destination_path', ''), - config.get('fixed_headers')) + config.get('fixed_headers'), + config.get('sftp_host'), + config.get('sftp_username'), + config.get('sftp_password'), + config.get('sftp_port'), + config.get('sftp_public_key'), + config.get('sftp_public_key_format'), + ) emit_state(state) logger.debug("Exiting normally")