From c07e9679d3b6f9e22ee2c38f6d3acc6785a1f2b1 Mon Sep 17 00:00:00 2001 From: Summit Maheshwari Date: Sat, 11 Mar 2017 22:34:04 -0800 Subject: [PATCH] SDK-178: Support for File and Partition Sensors. --- bin/qds.py | 19 ++++++++++- qds_sdk/sensors.py | 75 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_sensor.py | 36 +++++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 qds_sdk/sensors.py create mode 100644 tests/test_sensor.py diff --git a/bin/qds.py b/bin/qds.py index 23ae2e54..e2cc440d 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -16,6 +16,7 @@ from qds_sdk.nezha import NezhaCmdLine from qds_sdk.user import UserCmdLine from qds_sdk.template import TemplateCmdLine +from qds_sdk.sensors import * import os import sys @@ -37,6 +38,11 @@ "prestocmd": PrestoCommand } +SensorClasses = { + "filesensor": FileSensor, + "partitionsensor": PartitionSensor +} + usage_str = ( "Usage: qds.py [options] \n" "\nCommand subcommands:\n" @@ -86,7 +92,9 @@ "\nNezha subcommand:\n" " nezha --help\n" "\nUser subcommad:\n" - " user --help\n") + " user --help\n" + "\nSensor subcommand:\n" + " --help\n") def usage(parser=None): @@ -195,6 +203,12 @@ def cmdmain(cmd, args): return globals()[action + "action"](cmdclass, args) +def sensormain(sensor, args): + sensor_class = SensorClasses[sensor] + print(SensorCmdLine.check(sensor_class, args)) + return 0 + + def checkargs_cluster_id_label(args): if len(args) != 1: sys.stderr.write("expecting single argument cluster id or cluster label\n") @@ -560,6 +574,9 @@ def main(): if a0 in CommandClasses: return cmdmain(a0, args) + if a0 in SensorClasses: + return sensormain(a0, args) + if a0 == "account": return accountmain(args) diff --git a/qds_sdk/sensors.py b/qds_sdk/sensors.py new file mode 100644 index 00000000..9c19dd9c --- /dev/null +++ b/qds_sdk/sensors.py @@ -0,0 +1,75 @@ +""" +The sensors module contains the base definition for a generic +sensor call and the implementation of all the specific sensors +""" + +from __future__ import print_function +from qds_sdk.qubole import Qubole +from qds_sdk.resource import Resource +from argparse import ArgumentParser + +import logging +import json + +log = logging.getLogger("qds_sensors") + + +class SensorCmdLine: + + @staticmethod + def check(sensor_class, args): + """ + Method to call Sensor.check after parsing args from cmdline + :param sensor_class: sensor class + :param args: inline arguments + :return: True or False + """ + parser = SensorCmdLine.parsers(sensor_class) + parsed = parser.parse_args(args) + return sensor_class.check(json.loads(parsed.data)) + + @staticmethod + def parsers(sensor_class): + argparser = ArgumentParser(prog=sensor_class.usage, description=sensor_class.description) + subparsers = argparser.add_subparsers() + + #Check + check = subparsers.add_parser("check", help="Check a Sensor") + check.add_argument("-d", "--data", dest="data", required=True, + help="String containing a valid json object") + check.set_defaults(func=Sensor.check) + return argparser + + +class Sensor(Resource): + """ + qds_sdk.Sensor is the base Qubole sensor class. Different types of Qubole + sensors can subclass this. + """ + + @classmethod + def check(cls, data): + """ + Method to call the sensors api with json payload + :param data: valid json object + :return: True or False + """ + conn = Qubole.agent() + return conn.post(cls.rest_entity_path, data=data)['status'] + + +class FileSensor(Sensor): + rest_entity_path = "sensors/file_sensor" + + usage = ("qds.py filesensor check -d 'json string'") + description = "File Sensor client for Qubole Data Services" + + +class PartitionSensor(Sensor): + rest_entity_path = "sensors/partition_sensor" + + usage = ("qds.py partitionsensor check -d 'json string'") + description = "Hive Partition Sensor client for Qubole Data Services" + + + diff --git a/tests/test_sensor.py b/tests/test_sensor.py new file mode 100644 index 00000000..0108266d --- /dev/null +++ b/tests/test_sensor.py @@ -0,0 +1,36 @@ +import sys +import os + +if sys.version_info > (2, 7, 0): + import unittest +else: + import unittest2 as unittest +from mock import * + +sys.path.append(os.path.join(os.path.dirname(__file__), '../bin')) +import qds +from qds_sdk.connection import Connection +from test_base import print_command +from test_base import QdsCliTestCase + + +class TestSensorCheck(QdsCliTestCase): + def test_file_sensor(self): + sys.argv = ['qds.py', 'filesensor', 'check', '-d', '{"files":["s3://dev.canopydata.com/airflow"]}'] + print_command() + Connection._api_call = Mock(return_value={'status': True}) + qds.main() + Connection._api_call.assert_called_with( + "POST", "sensors/file_sensor", {'files':['s3://dev.canopydata.com/airflow']}) + + + def test_partition_sensor(self): + sys.argv = ['qds.py', 'partitionsensor', 'check', '-d', '{"schema" : "default", "table" : "nation_s3_rcfile_p", "columns" : [{"column" : "p", "values" : [1, 2]}]}'] + print_command() + Connection._api_call = Mock(return_value={'status': True}) + qds.main() + Connection._api_call.assert_called_with( + "POST", "sensors/partition_sensor", {"schema" : "default", "table" : "nation_s3_rcfile_p", "columns" : [{"column" : "p", "values" : [1, 2]}]}) + +if __name__ == '__main__': + unittest.main()