From e36a8fef39008137addeb1557b45a9247daa4ed8 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Thu, 22 Apr 2021 21:06:26 +0000 Subject: [PATCH] Resolve "Create new table if one doesn't exist with the stream name needed" --- .gitignore | 4 + configexample.json | 2 +- data | 16 ++ poetry.lock | 219 ++++---------------- pyproject.toml | 8 +- singer_sdk/stream.py | 23 +++ singer_sdk/target.py | 134 ++++++++++++ singer_sdk_target/target_base.py | 282 ------------------------- singer_sdk_target/target_sink_base.py | 285 -------------------------- target-mssql.sh | 4 +- target_mssql/sink.py | 10 + target_mssql/streams.py | 168 ++++++++++----- target_mssql/target.py | 90 ++++---- target_mssql/tests/__init__.py | 1 - target_mssql/tests/test_core.py | 65 +++--- 15 files changed, 419 insertions(+), 892 deletions(-) create mode 100644 data create mode 100644 singer_sdk/stream.py create mode 100644 singer_sdk/target.py delete mode 100644 singer_sdk_target/target_base.py delete mode 100644 singer_sdk_target/target_sink_base.py create mode 100644 target_mssql/sink.py delete mode 100644 target_mssql/tests/__init__.py diff --git a/.gitignore b/.gitignore index 45480ee..6cea06b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ configexample.json + +#vim +*.swp + # Secrets and internal config files .secrets/* diff --git a/configexample.json b/configexample.json index d9b6a62..2692f2f 100644 --- a/configexample.json +++ b/configexample.json @@ -1,6 +1,6 @@ { "host": "localhost", - "port": 1521, + "port": 1433, "database": "temp_db", "user": "sa", "password": "Password" diff --git a/data b/data new file mode 100644 index 0000000..ee667b5 --- /dev/null +++ b/data @@ -0,0 +1,16 @@ +{"type": "SCHEMA", "stream": "employees", "schema": {"type": "object", "properties": {"id": {"type": ["string", "null"]}, "displayName": {"type": ["string", "null"]}, "firstName": {"type": ["string", "null"]}, "lastName": {"type": ["string", "null"]}, "gender": {"type": ["string", "null"]}, "jobTitle": {"type": ["string", "null"]}, "workPhone": {"type": ["string", "null"]}, "workPhoneExtension": {"type": ["string", "null"]}, "skypeUsername": {"type": ["string", "null"]}, "preferredName": {"type": ["string", "null"]}, "mobilePhone": {"type": ["string", "null"]}, "workEmail": {"type": ["string", "null"]}, "department": {"type": ["string", "null"]}, "location": {"type": ["string", "null"]}, "division": {"type": ["string", "null"]}, "linkedIn": {"type": ["string", "null"]}, "photoUploaded": {"type": ["boolean", "null"]}, "photoUrl": {"type": ["string", "null"]}, "canUploadPhoto": {"type": ["number", "null"]}}, "required": []}, "key_properties": ["id"]} +{"type": "RECORD", "stream": "employees", "record": {"id": "119", "displayName": "Alexa Aberdean", "firstName": "Alexa", "lastName": "Aberdean", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": "aaberdean@autoidm.com", "department": "Customer Experience", "location": "Kalamazoo", "division": "abc", "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878203Z"} +{"type": "STATE", "value": {"bookmarks": {"employees": {}}}} +{"type": "RECORD", "stream": "employees", "record": {"id": "120", "displayName": "Lisa Boyer", "firstName": "Lisa", "lastName": "Boyer", "preferredName": null, "jobTitle": "HR Generalist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Human Resources", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878919Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "117", "displayName": "Abby Lamar", "firstName": "Abby", "lastName": "Lamar", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879049Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "111", "displayName": "Patty Mae", "firstName": "Patty", "lastName": "Mae", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "b@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879185Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "122", "displayName": "Mickey Mouse", "firstName": "Mickey", "lastName": "Mouse", "preferredName": null, "jobTitle": "CNC Programmer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879338Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "115", "displayName": "Elon Musky", "firstName": "Elon", "lastName": "Musky", "preferredName": null, "jobTitle": "Process Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879454Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "116", "displayName": "Jeff Razer", "firstName": "Jeff", "lastName": "Razer", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879566Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "113", "displayName": "Tyler Sawyer", "firstName": "Tyler", "lastName": "Sawyer", "preferredName": null, "jobTitle": "Marketing Analyst", "workPhone": "5555555555", "mobilePhone": null, "workEmail": null, "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": "1", "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879670Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "110", "displayName": "Sam Smith", "firstName": "Sam", "lastName": "Smith", "preferredName": null, "jobTitle": "Sales Representative", "workPhone": null, "mobilePhone": "1234567891", "workEmail": "ssmith@autoidm.com", "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879805Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "114", "displayName": "Tim Teebow", "firstName": "Tim", "lastName": "Teebow", "preferredName": null, "jobTitle": "Network Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879902Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "121", "displayName": "Jake Vicks", "firstName": "Jake", "lastName": "Vicks", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879997Z"} +{"type": "STATE", "value": {"bookmarks": {"employees": {}}}} +{"type": "RECORD", "stream": "employees", "record": {"id": "109", "displayName": "Derek Visch", "firstName": "Derek", "lastName": "Visch", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "dvisch@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880110Z"} +{"type": "RECORD", "stream": "employees", "record": {"id": "118", "displayName": "Lexi Weatherton", "firstName": "Lexi", "lastName": "Weatherton", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Manufacturing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880206Z"} diff --git a/poetry.lock b/poetry.lock index 5cb1f9a..7696440 100644 --- a/poetry.lock +++ b/poetry.lock @@ -36,17 +36,6 @@ category = "main" optional = false python-versions = "*" -[[package]] -name = "cffi" -version = "1.14.5" -description = "Foreign Function Interface for Python calling C code." -category = "main" -optional = false -python-versions = "*" - -[package.dependencies] -pycparser = "*" - [[package]] name = "chardet" version = "4.0.0" @@ -79,25 +68,6 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -[[package]] -name = "cryptography" -version = "3.4.7" -description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers." -category = "main" -optional = false -python-versions = ">=3.6" - -[package.dependencies] -cffi = ">=1.12" - -[package.extras] -docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"] -docstest = ["doc8", "pyenchant (>=1.6.11)", "twine (>=1.12.0)", "sphinxcontrib-spelling (>=4.0.1)"] -pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"] -sdist = ["setuptools-rust (>=0.11.4)"] -ssh = ["bcrypt (>=3.1.5)"] -test = ["pytest (>=6.0)", "pytest-cov", "pytest-subtests", "pytest-xdist", "pretend", "iso8601", "pytz", "hypothesis (>=1.11.4,!=3.79.2)"] - [[package]] name = "idna" version = "2.10" @@ -108,7 +78,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "importlib-metadata" -version = "3.10.1" +version = "4.0.1" description = "Read metadata from Python packages" category = "main" optional = false @@ -159,19 +129,6 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [package.dependencies] pyparsing = ">=2.0.2" -[[package]] -name = "pendulum" -version = "1.2.0" -description = "Python datetimes made easy." -category = "main" -optional = false -python-versions = "*" - -[package.dependencies] -python-dateutil = "*" -pytzdata = "*" -tzlocal = "*" - [[package]] name = "pipelinewise-singer-python" version = "1.2.0" @@ -214,26 +171,13 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] -name = "pycparser" -version = "2.20" -description = "C parser in Python" -category = "main" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - -[[package]] -name = "pyjwt" -version = "1.7.1" -description = "JSON Web Token implementation in Python" +name = "pymssql" +version = "2.1.5" +description = "DB-API interface to Microsoft SQL Server for Python. (new Cython-based version)" category = "main" optional = false python-versions = "*" -[package.extras] -crypto = ["cryptography (>=1.4)"] -flake8 = ["flake8", "flake8-import-order", "pep8-naming"] -test = ["pytest (>=4.0.1,<5.0.0)", "pytest-cov (>=2.6.0,<3.0.0)", "pytest-runner (>=4.2,<5.0.0)"] - [[package]] name = "pyparsing" version = "2.4.7" @@ -291,14 +235,6 @@ category = "main" optional = false python-versions = "*" -[[package]] -name = "pytzdata" -version = "2020.1" -description = "The Olson timezone database for Python." -category = "main" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - [[package]] name = "requests" version = "2.25.1" @@ -325,24 +261,6 @@ category = "main" optional = false python-versions = "*" -[[package]] -name = "singer-sdk" -version = "0.1.2" -description = "A framework for building Singer taps" -category = "main" -optional = false -python-versions = ">=3.6,<3.9" - -[package.dependencies] -backoff = "1.8.0" -click = ">=7.1.2,<8.0.0" -cryptography = ">=3.4.6,<4.0.0" -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} -pendulum = "1.2.0" -pipelinewise-singer-python = "1.2.0" -PyJWT = "1.7.1" -requests = ">=2.25.1,<3.0.0" - [[package]] name = "six" version = "1.15.0" @@ -367,17 +285,6 @@ category = "main" optional = false python-versions = "*" -[[package]] -name = "tzlocal" -version = "2.1" -description = "tzinfo object for the local timezone" -category = "main" -optional = false -python-versions = "*" - -[package.dependencies] -pytz = "*" - [[package]] name = "urllib3" version = "1.26.4" @@ -406,7 +313,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pyt [metadata] lock-version = "1.1" python-versions = "<3.9,>=3.6" -content-hash = "065cf556df00ee068fd3287cb2f75698eb0c695eb4d68fbc259f2187631846bb" +content-hash = "d7a0bf50c13c1e59f6c01b1acf51886457ac73033627cc38cb6623b7066a40fb" [metadata.files] atomicwrites = [ @@ -425,45 +332,6 @@ certifi = [ {file = "certifi-2020.12.5-py2.py3-none-any.whl", hash = "sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830"}, {file = "certifi-2020.12.5.tar.gz", hash = "sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c"}, ] -cffi = [ - {file = "cffi-1.14.5-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:bb89f306e5da99f4d922728ddcd6f7fcebb3241fc40edebcb7284d7514741991"}, - {file = "cffi-1.14.5-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:34eff4b97f3d982fb93e2831e6750127d1355a923ebaeeb565407b3d2f8d41a1"}, - {file = "cffi-1.14.5-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:99cd03ae7988a93dd00bcd9d0b75e1f6c426063d6f03d2f90b89e29b25b82dfa"}, - {file = "cffi-1.14.5-cp27-cp27m-win32.whl", hash = "sha256:65fa59693c62cf06e45ddbb822165394a288edce9e276647f0046e1ec26920f3"}, - {file = "cffi-1.14.5-cp27-cp27m-win_amd64.whl", hash = "sha256:51182f8927c5af975fece87b1b369f722c570fe169f9880764b1ee3bca8347b5"}, - {file = "cffi-1.14.5-cp27-cp27mu-manylinux1_i686.whl", hash = "sha256:43e0b9d9e2c9e5d152946b9c5fe062c151614b262fda2e7b201204de0b99e482"}, - {file = "cffi-1.14.5-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:cbde590d4faaa07c72bf979734738f328d239913ba3e043b1e98fe9a39f8b2b6"}, - {file = "cffi-1.14.5-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:5de7970188bb46b7bf9858eb6890aad302577a5f6f75091fd7cdd3ef13ef3045"}, - {file = "cffi-1.14.5-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:a465da611f6fa124963b91bf432d960a555563efe4ed1cc403ba5077b15370aa"}, - {file = "cffi-1.14.5-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:d42b11d692e11b6634f7613ad8df5d6d5f8875f5d48939520d351007b3c13406"}, - {file = "cffi-1.14.5-cp35-cp35m-win32.whl", hash = "sha256:72d8d3ef52c208ee1c7b2e341f7d71c6fd3157138abf1a95166e6165dd5d4369"}, - {file = "cffi-1.14.5-cp35-cp35m-win_amd64.whl", hash = "sha256:29314480e958fd8aab22e4a58b355b629c59bf5f2ac2492b61e3dc06d8c7a315"}, - {file = "cffi-1.14.5-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:3d3dd4c9e559eb172ecf00a2a7517e97d1e96de2a5e610bd9b68cea3925b4892"}, - {file = "cffi-1.14.5-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:48e1c69bbacfc3d932221851b39d49e81567a4d4aac3b21258d9c24578280058"}, - {file = "cffi-1.14.5-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:69e395c24fc60aad6bb4fa7e583698ea6cc684648e1ffb7fe85e3c1ca131a7d5"}, - {file = "cffi-1.14.5-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:9e93e79c2551ff263400e1e4be085a1210e12073a31c2011dbbda14bda0c6132"}, - {file = "cffi-1.14.5-cp36-cp36m-win32.whl", hash = "sha256:58e3f59d583d413809d60779492342801d6e82fefb89c86a38e040c16883be53"}, - {file = "cffi-1.14.5-cp36-cp36m-win_amd64.whl", hash = "sha256:005a36f41773e148deac64b08f233873a4d0c18b053d37da83f6af4d9087b813"}, - {file = "cffi-1.14.5-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2894f2df484ff56d717bead0a5c2abb6b9d2bf26d6960c4604d5c48bbc30ee73"}, - {file = "cffi-1.14.5-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:0857f0ae312d855239a55c81ef453ee8fd24136eaba8e87a2eceba644c0d4c06"}, - {file = "cffi-1.14.5-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:cd2868886d547469123fadc46eac7ea5253ea7fcb139f12e1dfc2bbd406427d1"}, - {file = "cffi-1.14.5-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:35f27e6eb43380fa080dccf676dece30bef72e4a67617ffda586641cd4508d49"}, - {file = "cffi-1.14.5-cp37-cp37m-win32.whl", hash = "sha256:9ff227395193126d82e60319a673a037d5de84633f11279e336f9c0f189ecc62"}, - {file = "cffi-1.14.5-cp37-cp37m-win_amd64.whl", hash = "sha256:9cf8022fb8d07a97c178b02327b284521c7708d7c71a9c9c355c178ac4bbd3d4"}, - {file = "cffi-1.14.5-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8b198cec6c72df5289c05b05b8b0969819783f9418e0409865dac47288d2a053"}, - {file = "cffi-1.14.5-cp38-cp38-manylinux1_i686.whl", hash = "sha256:ad17025d226ee5beec591b52800c11680fca3df50b8b29fe51d882576e039ee0"}, - {file = "cffi-1.14.5-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:6c97d7350133666fbb5cf4abdc1178c812cb205dc6f41d174a7b0f18fb93337e"}, - {file = "cffi-1.14.5-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:8ae6299f6c68de06f136f1f9e69458eae58f1dacf10af5c17353eae03aa0d827"}, - {file = "cffi-1.14.5-cp38-cp38-win32.whl", hash = "sha256:b85eb46a81787c50650f2392b9b4ef23e1f126313b9e0e9013b35c15e4288e2e"}, - {file = "cffi-1.14.5-cp38-cp38-win_amd64.whl", hash = "sha256:1f436816fc868b098b0d63b8920de7d208c90a67212546d02f84fe78a9c26396"}, - {file = "cffi-1.14.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:1071534bbbf8cbb31b498d5d9db0f274f2f7a865adca4ae429e147ba40f73dea"}, - {file = "cffi-1.14.5-cp39-cp39-manylinux1_i686.whl", hash = "sha256:9de2e279153a443c656f2defd67769e6d1e4163952b3c622dcea5b08a6405322"}, - {file = "cffi-1.14.5-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:6e4714cc64f474e4d6e37cfff31a814b509a35cb17de4fb1999907575684479c"}, - {file = "cffi-1.14.5-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:158d0d15119b4b7ff6b926536763dc0714313aa59e320ddf787502c70c4d4bee"}, - {file = "cffi-1.14.5-cp39-cp39-win32.whl", hash = "sha256:afb29c1ba2e5a3736f1c301d9d0abe3ec8b86957d04ddfa9d7a6a42b9367e396"}, - {file = "cffi-1.14.5-cp39-cp39-win_amd64.whl", hash = "sha256:f2d45f97ab6bb54753eab54fffe75aaf3de4ff2341c9daee1987ee1837636f1d"}, - {file = "cffi-1.14.5.tar.gz", hash = "sha256:fd78e5fee591709f32ef6edb9a015b4aa1a5022598e36227500c8f4e02328d9c"}, -] chardet = [ {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"}, {file = "chardet-4.0.0.tar.gz", hash = "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa"}, @@ -479,27 +347,13 @@ colorama = [ {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"}, ] -cryptography = [ - {file = "cryptography-3.4.7-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1"}, - {file = "cryptography-3.4.7-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250"}, - {file = "cryptography-3.4.7-cp36-abi3-manylinux2010_x86_64.whl", hash = "sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2"}, - {file = "cryptography-3.4.7-cp36-abi3-manylinux2014_aarch64.whl", hash = "sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6"}, - {file = "cryptography-3.4.7-cp36-abi3-manylinux2014_x86_64.whl", hash = "sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959"}, - {file = "cryptography-3.4.7-cp36-abi3-win32.whl", hash = "sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d"}, - {file = "cryptography-3.4.7-cp36-abi3-win_amd64.whl", hash = "sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca"}, - {file = "cryptography-3.4.7-pp36-pypy36_pp73-manylinux2010_x86_64.whl", hash = "sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873"}, - {file = "cryptography-3.4.7-pp36-pypy36_pp73-manylinux2014_x86_64.whl", hash = "sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d"}, - {file = "cryptography-3.4.7-pp37-pypy37_pp73-manylinux2010_x86_64.whl", hash = "sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177"}, - {file = "cryptography-3.4.7-pp37-pypy37_pp73-manylinux2014_x86_64.whl", hash = "sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9"}, - {file = "cryptography-3.4.7.tar.gz", hash = "sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713"}, -] idna = [ {file = "idna-2.10-py2.py3-none-any.whl", hash = "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"}, {file = "idna-2.10.tar.gz", hash = "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6"}, ] importlib-metadata = [ - {file = "importlib_metadata-3.10.1-py3-none-any.whl", hash = "sha256:2ec0faae539743ae6aaa84b49a169670a465f7f5d64e6add98388cc29fd1f2f6"}, - {file = "importlib_metadata-3.10.1.tar.gz", hash = "sha256:c9356b657de65c53744046fa8f7358afe0714a1af7d570c00c3835c2d724a7c1"}, + {file = "importlib_metadata-4.0.1-py3-none-any.whl", hash = "sha256:d7eb1dea6d6a6086f8be21784cc9e3bcfa55872b52309bc5fad53a8ea444465d"}, + {file = "importlib_metadata-4.0.1.tar.gz", hash = "sha256:8c501196e49fb9df5df43833bdb1e4328f64847763ec8a50703148b73784d581"}, ] iniconfig = [ {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, @@ -513,16 +367,6 @@ packaging = [ {file = "packaging-20.9-py2.py3-none-any.whl", hash = "sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a"}, {file = "packaging-20.9.tar.gz", hash = "sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5"}, ] -pendulum = [ - {file = "pendulum-1.2.0-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:e467bec4ec56436654499f52789b11d9e15b2bbbb0696a4ee4b6b654fe72d66f"}, - {file = "pendulum-1.2.0-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:ce6c764ba2acaf185a2e4bb02220ac986ad371ea6e7a9d9e42ad882c6d788e0a"}, - {file = "pendulum-1.2.0-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:925ff1a3094d3517928a5ffca92cf0b0c9016f289133129433c9be0eede937ec"}, - {file = "pendulum-1.2.0-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:93a5a66cea7222e8388fcb52947ff2e230e7194815baa43b5f85c7e56611b515"}, - {file = "pendulum-1.2.0-cp36-cp36m-macosx_10_11_x86_64.whl", hash = "sha256:a97e3ed9557ac0c5c3742f21fa4d852d7a050dd9b1b517e993aebef2dd2eea52"}, - {file = "pendulum-1.2.0-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:ce67c0777a9f7340e5c6884faf19e571772fe0969c978044bbae601bd0f70873"}, - {file = "pendulum-1.2.0-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:1d7eb559133e3aa0e2f62fe44e143398c05debce6fabca95732225a58700bec3"}, - {file = "pendulum-1.2.0.tar.gz", hash = "sha256:641140a05f959b37a177866e263f6f53a53b711fae6355336ee832ec1a59da8a"}, -] pipelinewise-singer-python = [ {file = "pipelinewise-singer-python-1.2.0.tar.gz", hash = "sha256:8ba501f9092dbd686cd5792ecf6aa97c2d25c225e9d8b2875dcead0f5738898c"}, {file = "pipelinewise_singer_python-1.2.0-py3-none-any.whl", hash = "sha256:156f011cba10b1591ae37c5510ed9d21639258c1377cc00c07d9f7e9a3ae27fb"}, @@ -535,13 +379,36 @@ py = [ {file = "py-1.10.0-py2.py3-none-any.whl", hash = "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"}, {file = "py-1.10.0.tar.gz", hash = "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3"}, ] -pycparser = [ - {file = "pycparser-2.20-py2.py3-none-any.whl", hash = "sha256:7582ad22678f0fcd81102833f60ef8d0e57288b6b5fb00323d101be910e35705"}, - {file = "pycparser-2.20.tar.gz", hash = "sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0"}, -] -pyjwt = [ - {file = "PyJWT-1.7.1-py2.py3-none-any.whl", hash = "sha256:5c6eca3c2940464d106b99ba83b00c6add741c9becaec087fb7ccdefea71350e"}, - {file = "PyJWT-1.7.1.tar.gz", hash = "sha256:8d59a976fb773f3e6a39c85636357c4f0e242707394cadadd9814f5cbaa20e96"}, +pymssql = [ + {file = "pymssql-2.1.5-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:cff8e775fb6294effeb716735bfd7707e79a2a79b617d0f1984bd574f26bda65"}, + {file = "pymssql-2.1.5-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:c47c093cc4dc60e3356458c8e2935bab3834cea1f94a66c8ca62a5af2f060d64"}, + {file = "pymssql-2.1.5-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:658b4ea09050c85c6be09e1371335198b9441d2b5b08ef4f0b250ee4e5e8afc3"}, + {file = "pymssql-2.1.5-cp27-cp27m-win32.whl", hash = "sha256:f36392e1874445d7cb67b928686ad424b0b3980282512b21f640828ad3adf968"}, + {file = "pymssql-2.1.5-cp27-cp27m-win_amd64.whl", hash = "sha256:4fd4991eee848a4fd7d0b19a24fe49b508633881e221004652ab15a7e4cfe041"}, + {file = "pymssql-2.1.5-cp27-cp27mu-manylinux1_i686.whl", hash = "sha256:cfd9ae0484056e46b981b7c3893ddb620ccd52f48349bada78cb141192dfbfbe"}, + {file = "pymssql-2.1.5-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:36539e42e8bb33018a05f9bd524b5a76286132ab7c82bfe9b60c4169d460fdf5"}, + {file = "pymssql-2.1.5-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:18b6550a02b34e88134b4b70eedcc6982036e459b0c91c7dd248bb1926287264"}, + {file = "pymssql-2.1.5-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:c7a715c0b2b3a37462a9cf7972ed9ef0be98b2c64aebd549359f08af7f53b9a9"}, + {file = "pymssql-2.1.5-cp35-cp35m-win32.whl", hash = "sha256:e4741c6ec0483dcadb8a63077a7ceb17f263d9815ea842fed6663508c8852d7f"}, + {file = "pymssql-2.1.5-cp35-cp35m-win_amd64.whl", hash = "sha256:3e077455a11fcb4cb8705cb3ae83236b8e130df9fd4186c707d638e8e43f9646"}, + {file = "pymssql-2.1.5-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:0ff55a944ee7506a5e9aef7b40f0cddabc0b61f9ba13d716bff5a308923b8111"}, + {file = "pymssql-2.1.5-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:3bdbeca64af7856923b7f84ed3355e2fd00bb1b897877b0bd4a74ec638801d52"}, + {file = "pymssql-2.1.5-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:dfc764a5a07197d742da34a593578295e9f8b64bb035c07e0981961672e18c85"}, + {file = "pymssql-2.1.5-cp36-cp36m-win32.whl", hash = "sha256:4f6d4434c29b846f491f5236daf06175f1652953d1d162be0f1b2b037bcf9a8d"}, + {file = "pymssql-2.1.5-cp36-cp36m-win_amd64.whl", hash = "sha256:aad5a1218691f83a16bab6dcfa24abf9da796abf5bf168a41972fe1cf93b3e37"}, + {file = "pymssql-2.1.5-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:1682ead549dcec31f3b8cc47da429572ea1c4b106cb4fa91df884f968123af93"}, + {file = "pymssql-2.1.5-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:ec28c73afde96def469c581208903cf035923dc6313b6916f80cbcc71f9413d1"}, + {file = "pymssql-2.1.5-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:fcf98e2c7cf18fa2fa09cdb7220849cd02e7b9481cb81ccdd8940da438f58d85"}, + {file = "pymssql-2.1.5-cp37-cp37m-win32.whl", hash = "sha256:1e8d8abab391559b70f5df97fb22fc1d9ea627edcb943e558bdc7d7f455f93e2"}, + {file = "pymssql-2.1.5-cp37-cp37m-win_amd64.whl", hash = "sha256:e19a59eb8115418c3debcc9b685b2138d0abe6c9cb8c00bc2e738eb744bc6bda"}, + {file = "pymssql-2.1.5-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:2108114e4cc34ebbb8031df3e5579320e7569d51cd5094c5ddc333bf749d09a0"}, + {file = "pymssql-2.1.5-cp38-cp38-manylinux1_i686.whl", hash = "sha256:3977b056c5db8d01e74d88417cbb48e3e8bf03ab09ca6ef53790d025eae543df"}, + {file = "pymssql-2.1.5-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:d0f8094330523b8e4763a6903151bc35069309ccb57c61f87eeaa910a34f5a35"}, + {file = "pymssql-2.1.5-cp38-cp38-win32.whl", hash = "sha256:10f9b5b033eb30a38f4b36144eb4583fd478fd30afa9d64cd9a1965d22740446"}, + {file = "pymssql-2.1.5-cp38-cp38-win_amd64.whl", hash = "sha256:557719b3ebc4617543de52eaadcdb6779f0c850e95b07be5f9775a2ef6a6c61f"}, + {file = "pymssql-2.1.5-cp39-cp39-manylinux1_i686.whl", hash = "sha256:04aab92d5a1a5d4e01a0797a939f103f02c0ef777bc8dcf1e952ed30dd1d43d4"}, + {file = "pymssql-2.1.5-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:70a5c67759254e982368c5b9ccfe076447a7fd545b8376eb62d60c3b85e3b94d"}, + {file = "pymssql-2.1.5.tar.gz", hash = "sha256:d60f5f90337399668e10ab6a23a1657f190c9585401eb96a5456261f7c414864"}, ] pyparsing = [ {file = "pyparsing-2.4.7-py2.py3-none-any.whl", hash = "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b"}, @@ -562,10 +429,6 @@ pytz = [ {file = "pytz-2020.5-py2.py3-none-any.whl", hash = "sha256:16962c5fb8db4a8f63a26646d8886e9d769b6c511543557bc84e9569fb9a9cb4"}, {file = "pytz-2020.5.tar.gz", hash = "sha256:180befebb1927b16f6b57101720075a984c019ac16b1b7575673bea42c6c3da5"}, ] -pytzdata = [ - {file = "pytzdata-2020.1-py2.py3-none-any.whl", hash = "sha256:e1e14750bcf95016381e4d472bad004eef710f2d6417240904070b3d6654485f"}, - {file = "pytzdata-2020.1.tar.gz", hash = "sha256:3efa13b335a00a8de1d345ae41ec78dd11c9f8807f522d39850f2dd828681540"}, -] requests = [ {file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"}, {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"}, @@ -589,10 +452,6 @@ simplejson = [ {file = "simplejson-3.11.1.win32-py3.4.exe", hash = "sha256:97cc43ef4cb18a2725f6e26d22b96f8ca50872a195bde32707dcb284f89c1d4d"}, {file = "simplejson-3.11.1.win32-py3.5.exe", hash = "sha256:c76d55d78dc8b06c96fd08c6cc5e2b0b650799627d3f9ca4ad23f40db72d5f6d"}, ] -singer-sdk = [ - {file = "singer-sdk-0.1.2.tar.gz", hash = "sha256:e51ad272835c527ef4f5c2deb87beaa85ea3afb974a766937fa81c7a22bfae8a"}, - {file = "singer_sdk-0.1.2-py3-none-any.whl", hash = "sha256:6cd3f222eefb097adf4e75e9aa05bbc7255a0d5b714c72b6ae15dcc7bf92929e"}, -] six = [ {file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"}, {file = "six-1.15.0.tar.gz", hash = "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259"}, @@ -606,10 +465,6 @@ typing-extensions = [ {file = "typing_extensions-3.7.4.3-py3-none-any.whl", hash = "sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918"}, {file = "typing_extensions-3.7.4.3.tar.gz", hash = "sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c"}, ] -tzlocal = [ - {file = "tzlocal-2.1-py2.py3-none-any.whl", hash = "sha256:e2cb6c6b5b604af38597403e9852872d7f534962ae2954c7f35efcb1ccacf4a4"}, - {file = "tzlocal-2.1.tar.gz", hash = "sha256:643c97c5294aedc737780a49d9df30889321cbe1204eac2c2ec6134035a92e44"}, -] urllib3 = [ {file = "urllib3-1.26.4-py2.py3-none-any.whl", hash = "sha256:2f4da4594db7e1e110a944bb1b551fdf4e6c136ad42e4234131391e21eb5b0df"}, {file = "urllib3-1.26.4.tar.gz", hash = "sha256:e7b021f7241115872f92f43c6508082facffbd1c048e3c6e2bb9c2a157e28937"}, diff --git a/pyproject.toml b/pyproject.toml index 3b6c37e..f622879 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,12 +3,16 @@ name = "target-mssql" version = "0.0.1" description = "`target-mssql` is Singer tap for MSSQL, built with the Singer SDK." authors = ["Derek Visch"] -license = "Apache 2.0" +license = "MIT" [tool.poetry.dependencies] python = "<3.9,>=3.6" +pipelinewise-singer-python = "1.2.0" +jsonschema = "3.2.0" +click = "^7.1.2" requests = "^2.25.1" -singer-sdk = "^0.1.0" +simplejson = "3.11.1" +pymssql = "2.1.5" [tool.poetry.dev-dependencies] pytest = "^6.1.2" diff --git a/singer_sdk/stream.py b/singer_sdk/stream.py new file mode 100644 index 0000000..42d9cb6 --- /dev/null +++ b/singer_sdk/stream.py @@ -0,0 +1,23 @@ +#Borrowed from https://gitlab.com/DouweM/target-lunch-money/-/blob/master/target_lunch_money/singer_sdk/stream.py +from jsonschema.validators import Draft4Validator +import singer +logger = singer.get_logger() + +class Stream: + #TODO as of right now without a schema things will fail + def __init__(self, target, name=None, schema=None, key_properties=None): + self.target = target + if name: + self.name = name + if schema: + self.schema = schema + self.validator = Draft4Validator(self.schema) + if key_properties: + self.key_properties = key_properties + + def process_record(self, record): + self.validator.validate(record) + self.persist_record(record) + + def persist_record(self, record): + raise NotImplementedError diff --git a/singer_sdk/target.py b/singer_sdk/target.py new file mode 100644 index 0000000..8055539 --- /dev/null +++ b/singer_sdk/target.py @@ -0,0 +1,134 @@ +#Borrowed from https://gitlab.com/DouweM/target-lunch-money/-/blob/master/target_lunch_money/singer_sdk/target.py +import io + +import click +import simplejson as json +import sys + +import singer + +logger = singer.get_logger() + + +class Target: + def __init__(self, config): + if isinstance(config, dict): + self.config = config + elif isinstance(config, str): + with open(config) as input_json: + self.config = json.load(input_json) + else: + self.config = {} + + self.state = None + + self.streams_in = {} + + def get_stream(self, name, schema, key_properties): + #try: + # stream = next(stream for stream in self.streams() if stream.name == name) + #except StopIteration: + # raise Exception("Unsupported stream {}".format(name)) + #TODO: This is a really silly way to do this + #except TypeError: + #TODO Deal with Existing streams at some point + stream = self.streamclass(target=self, name=name, schema=schema, key_properties=key_properties) + stream.schema_in = schema + stream.key_properties_in = key_properties + return stream + #return self.stream_class( + # target=self, name=name, schema=schema, key_properties=key_properties + #) + + def process_schema_message(self, message): + stream = message["stream"] + + self.streams_in[stream] = self.get_stream( + stream, message["schema"], message["key_properties"] + ) + + def process_record_message(self, message): + stream_name = message["stream"] + try: + stream = self.streams_in[stream_name] + except KeyError: + raise Exception( + "A record for stream {}" + "was encountered before a corresponding schema".format(stream_name) + ) + + stream.process_record(message["record"]) + self.state = None + + def process_state_message(self, message): + self.state = message["value"] + logger.debug("Set state to {}".format(self.state)) + + def process_messages(self, messages): + message_handlers = { + "SCHEMA": self.process_schema_message, + "RECORD": self.process_record_message, + "STATE": self.process_state_message, + } + + for raw_message in messages: + try: + message = singer.parse_message(raw_message).asdict() + except json.decoder.JSONDecodeError: + logger.error("Unable to parse:\n{}".format(raw_message)) + raise + + message_type = message["type"] + + try: + handler = message_handlers[message_type] + except KeyError: + logger.warning( + "Unknown message type {} in message {}".format( + message_type, message + ) + ) + continue + + handler(message) + + self.emit_state() + + def emit_state(self): + if self.state is None: + return + + line = json.dumps(self.state) + logger.debug("Emitting state {}".format(line)) + sys.stdout.write("{}\n".format(line)) + sys.stdout.flush() + + @classmethod + def cli(cls): + @click.option("--config") + @click.command() + def cli(config: str = None): + target = cls(config=config) + + input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") + target.process_messages(input_messages) + logger.debug("Exiting normally") + + return cli + +#TODO Get rid of this class as it's not used +class MappingTarget(Target): + def stream_mapping(self, name): + return self.config.get("mapping", {}).get(name, {}) + + def get_stream(self, name, schema, key_properties): + try: + stream = next(stream for stream in self.streams() if stream.name_in == name) + except StopIteration: + raise Exception("Unsupported stream {}".format(name)) + + stream.schema_in = schema + stream.key_properties_in = key_properties + + return stream + diff --git a/singer_sdk_target/target_base.py b/singer_sdk_target/target_base.py deleted file mode 100644 index 958b02f..0000000 --- a/singer_sdk_target/target_base.py +++ /dev/null @@ -1,282 +0,0 @@ -"""TargetBase abstract class.""" - -import abc -import copy -import json - -import click -from singer_sdk.helpers import classproperty -from singer_sdk import helpers - -from typing import Any, Dict, Iterable, Optional, Type - -from singer_sdk.plugin_base import PluginBase -from singer_sdk.target_sink_base import TargetSinkBase - - -class TargetBase(PluginBase, metaclass=abc.ABCMeta): - """Abstract base class for targets.""" - - # Constructor - - default_sink_class: Type[TargetSinkBase] - _sinks: Dict[str, TargetSinkBase] = {} - - def __init__(self, config: Optional[Dict[str, Any]] = None,) -> None: - """Initialize the tap.""" - self.logger.info(f"Initializing '{self.name}' target...") - self._state = {} - self._flushed_state = {} - self._schemas = {} - super().__init__(config=config) - - def get_sink(self, stream_name: str) -> TargetSinkBase: - if stream_name in self._sinks: - return self._sinks[stream_name] - raise RuntimeError( - "Attempted to retrieve stream before initialization. " - "Please check that the upstream tap has sent the proper SCHEMA message." - ) - - def sink_exists(self, stream_name: str) -> bool: - return stream_name in self._sinks - - def init_sink(self, stream_name: str, schema: dict) -> TargetSinkBase: - self.logger.info(f"Initializing '{self.name}' target sink...") - self._sinks[stream_name] = self.default_sink_class( - target=self, stream_name=stream_name, schema=schema - ) - return self._sinks[stream_name] - - def process_lines(self, lines: Iterable[str], table_cache=None) -> None: - self.logger.info(f"Target '{self.name}' is listening for input from tap.") - line_counter = 0 - record_counter = 0 - state_counter = 0 - for line in lines: - line_counter += 1 - try: - line_dict = json.loads(line) - except json.decoder.JSONDecodeError: - self.logger.error("Unable to parse:\n{}".format(line)) - raise - if "type" not in line_dict: - raise Exception("Line is missing required key 'type': {}".format(line)) - record_type = line_dict["type"] - if record_type == "SCHEMA": - self.process_schema_message(line_dict) - elif record_type == "RECORD": - self.process_record_message(line_dict) - record_counter += 1 - elif record_type == "ACTIVATE_VERSION": - self.process_activate_version_message(line_dict) - elif record_type == "STATE": - self.process_state_message(line_dict) - state_counter += 1 - else: - raise Exception(f"Unknown message type {record_type} in message {o}") - self.logger.info( - f"Target '{self.name}' completed after {line_counter} lines of input " - f"({record_counter} records, {state_counter} state messages)." - ) - - def process_record_message(self, message_dict: dict) -> None: - if "stream" not in message_dict: - raise Exception(f"Line is missing required key 'stream': {message_dict}") - stream_name = message_dict["stream"] - if not self.sink_exists(stream_name): - raise Exception( - f"A record for stream '{stream_name}' was encountered before a " - "corresponding schema." - ) - stream = self.get_sink(stream_name) - record = message_dict["record"] - stream.process_record(record, message_dict) - if ( - stream._num_records_cached - >= self.default_sink_class.DEFAULT_BATCH_SIZE_ROWS - ): - # flush all streams, delete records if needed, reset counts and then emit current state - if self.get_config("flush_all_streams"): - streams_to_flush = self._sinks - else: - streams_to_flush = [stream] - for stream in streams_to_flush: - stream.flush_all() - - def process_schema_message(self, message_dict: dict) -> None: - if "stream" not in message_dict: - raise Exception(f"Line is missing required key 'stream': {message_dict}") - if "schema" not in message_dict: - raise Exception(f"Line is missing required key 'schema': {message_dict}") - - stream_name = message_dict["stream"] - new_schema = helpers._float_to_decimal(message_dict["schema"]) - new_schema = helpers._float_to_decimal(message_dict["schema"]) - - # Update and flush only if the the schema is new or different than - # the previously used version of the schema - if stream_name not in self._schemas: - self.init_sink(stream_name, new_schema) - else: - sink = self.get_sink(stream_name) - prev_schema = sink.schema - if prev_schema != new_schema: - # flush records from previous stream SCHEMA - # if same stream has been encountered again, it means the schema might have been altered - # so previous records need to be flushed - sink.flush_records() - if self._row_count.get(stream_name, 0) > 0: - self.flushed_state = self.flush_sinks(self._stream_to_sync) - - # emit latest encountered state - self.emit_state(self.flushed_state) - - # key_properties key must be available in the SCHEMA message. - if "key_properties" not in message_dict: - raise Exception("key_properties field is required") - - # Log based and Incremental replications on tables with no Primary Key - # cause duplicates when merging UPDATE events. - # Stop loading data by default if no Primary Key. - # - # If you want to load tables with no Primary Key: - # 1) Set ` 'primary_key_required': false ` in the target-snowflake config.json - # or - # 2) Use fastsync [postgres-to-snowflake, mysql-to-snowflake, etc.] - if ( - self.get_config("primary_key_required", True) - and len(message_dict["key_properties"]) == 0 - ): - self.logger.critical( - "Primary key is set to mandatory but not defined in " - f"the [{stream_name}] stream" - ) - raise Exception("key_properties field is required") - - self._key_properties[stream_name] = message_dict["key_properties"] - - if self.get_config("add_metadata_columns") or self.get_config( - "hard_delete" - ): - stream_to_sync[stream_name] = DbSync( - config, - add_metadata_columns_to_schema(message_dict), - table_cache, - ) - else: - stream_to_sync[stream_name] = DbSync( - config, message_dict, table_cache - ) - - stream_to_sync[stream_name].create_schema_if_not_exists() - stream_to_sync[stream_name].sync_table() - - self._row_count[stream_name] = 0 - self._total_row_count[stream_name] = 0 - - # pylint: disable=too-many-arguments - def flush_sinks(stream_to_sync, filter_streams=None): - """ - Flushes all buckets and resets records count to 0 as well as empties records to load list - - :param filter_streams: Keys of streams to flush from the streams dict. Default is every stream - :return: State dict with flushed positions - """ - parallelism = self.get_config("parallelism", DEFAULT_PARALLELISM) - max_parallelism = self.get_config("max_parallelism", DEFAULT_MAX_PARALLELISM) - # Parallelism 0 means auto parallelism: - # - # Auto parallelism trying to flush streams efficiently with auto defined number - # of threads where the number of threads is the number of streams that need to - # be loaded but it's not greater than the value of max_parallelism - if parallelism == 0: - n_streams_to_flush = len(streams.keys()) - if n_streams_to_flush > max_parallelism: - parallelism = max_parallelism - else: - parallelism = n_streams_to_flush - # Select the required streams to flush - if filter_streams: - streams_to_flush = filter_streams - else: - streams_to_flush = streams.keys() - # Single-host, thread-based parallelism - with parallel_backend("threading", n_jobs=parallelism): - Parallel()( - delayed(load_stream_batch)( - stream=stream, - records_to_load=streams[stream], - row_count=row_count, - db_sync=stream_to_sync[stream], - no_compression=self.get_config("no_compression"), - delete_rows=self.get_config("hard_delete"), - temp_dir=self.get_config("temp_dir"), - ) - for stream in streams_to_flush - ) - # reset flushed stream records to empty to avoid flushing same records - for stream in streams_to_flush: - streams[stream] = {} - # Update flushed streams - if filter_streams: - # update flushed_state position if we have state information for the stream - if state is not None and stream in state.get("bookmarks", {}): - # Create bookmark key if not exists - if "bookmarks" not in flushed_state: - flushed_state["bookmarks"] = {} - # Copy the stream bookmark from the latest state - flushed_state["bookmarks"][stream] = copy.deepcopy( - state["bookmarks"][stream] - ) - # If we flush every bucket use the latest state - else: - flushed_state = copy.deepcopy(state) - # Return with state message with flushed positions - return flushed_state - - def process_activate_version_message(self, message_dict: dict) -> None: - self.logger.debug("ACTIVATE_VERSION message") - - def process_state_message(self, message_dict: dict) -> None: - self.logger.debug(f"Setting state to {message_dict['value']}") - state = message_dict["value"] - # Initially set flushed state - sink = self.get_sink(message_dict["stream"]) - sink.flush_all() - if not self._flushed_state: - self._flushed_state = copy.deepcopy(state) - - def handle_cli_args(self, args, cwd, environ) -> None: - """Take necessary action in response to a CLI command.""" - pass - - @classproperty - def cli(cls): - """Execute standard CLI handler for taps.""" - - @click.option("--version", is_flag=True) - @click.option("--about", is_flag=True) - @click.option("--format") - @click.option("--config") - @click.command() - def cli( - version: bool = False, - about: bool = False, - discover: bool = False, - config: str = None, - state: str = None, - catalog: str = None, - format: str = None, - ): - """Handle command line execution.""" - if version: - cls.print_version() - return - if about: - cls.print_about(format) - return - target = cls(config=config, state=state) - target.process_lines() - - return cli diff --git a/singer_sdk_target/target_sink_base.py b/singer_sdk_target/target_sink_base.py deleted file mode 100644 index 9774d86..0000000 --- a/singer_sdk_target/target_sink_base.py +++ /dev/null @@ -1,285 +0,0 @@ -"""Abstract base class for loading a single singer stream to its target.""" - -import abc -import datetime -import json -import sys -import re -from logging import Logger -import collections -from singer_sdk.plugin_base import PluginBase -from typing import Dict, Iterable, Optional - -from jsonschema import Draft4Validator, FormatChecker - -from dateutil import parser - -import inflection - - -class RecordFlattener: - """Flattens hierarchical records into 2-dimensional ones.""" - - sep: str - max_level: Optional[int] - - def __init__(self, sep: str = "__", max_level: int = None): - """Initialize flattener.""" - self.sep = sep - self.max_level = max_level - - def flatten_key(self, k, parent_key): - """Return a flattened version of the key.""" - full_key = parent_key + [k] - inflected_key = full_key.copy() - reducer_index = 0 - while len(self.sep.join(inflected_key)) >= 255 and reducer_index < len( - inflected_key - ): - reduced_key = re.sub( - r"[a-z]", "", inflection.camelize(inflected_key[reducer_index]) - ) - inflected_key[reducer_index] = ( - reduced_key - if len(reduced_key) > 1 - else inflected_key[reducer_index][0:3] - ).lower() - reducer_index += 1 - return self.sep.join(inflected_key) - - def flatten_record(self, d, flatten_schema=None, parent_key=[], level=0): - """Return a flattened version of the record.""" - items = [] - for k, v in d.items(): - new_key = self._flatten_key(k, parent_key) - if isinstance(v, collections.MutableMapping) and level < self.max_level: - items.extend( - self._flatten_record( - v, - flatten_schema, - parent_key + [k], - sep=self.sep, - level=level + 1, - ).items() - ) - else: - items.append( - ( - new_key, - json.dumps(v) - if self._should_json_dump_value(k, v, flatten_schema) - else v, - ) - ) - return dict(items) - - @staticmethod - def _should_json_dump_value(key, value, flatten_schema=None) -> bool: - if isinstance(value, (dict, list)): - return True - if ( - flatten_schema - and key in flatten_schema - and "type" in flatten_schema[key] - and set(flatten_schema[key]["type"]) == {"null", "object", "array"} - ): - return True - return False - - -class TargetSinkBase(metaclass=abc.ABCMeta): - """Abstract base class for target streams.""" - - DEFAULT_BATCH_SIZE_ROWS = 100000 - DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables - DEFAULT_MAX_PARALLELISM = 16 - # Don't use more than this number of threads by default when flushing streams in - # parallel - - APPEND_SDC_METADATA_COLS = True - - DATETIME_ERROR_TREATMENT = "MAX" - MAX_FLATTEN_DEPTH = 0 - - # max timestamp/datetime supported, used to reset all invalid dates that are beyond this value - MAX_TIMESTAMP = "9999-12-31 23:59:59.999999" - MAX_TIME = "23:59:59.999999" - - logger: Logger - schema: Dict - stream_name: str - validator: Draft4Validator - flattener: Optional[RecordFlattener] - - # TODO: Evaluate whether to keep PK-dedupe algorithm or switch to list/queue. - _records_cache: Dict[str, Dict] = {} - - _num_records_cached: int = 0 - _total_records_read: int = 0 - _dupe_records_received: int = 0 - _flushed_state: dict = {} - _cache_state: dict = {} - - def __init__(self, target: PluginBase, stream_name: str, schema: Dict,) -> None: - """Initialize target stream.""" - self.logger = target.logger - self.logger.info("DEBUG: Initializing target sink...") - self.schema = schema - self.stream_name = stream_name - # self.logger: logging.Logger = target.logger - self.flattener = RecordFlattener(max_level=self.MAX_FLATTEN_DEPTH) - self.validator = Draft4Validator(schema, format_checker=FormatChecker()) - - def process_record(self, record: Dict, message: Dict): - """Process record.""" - self.logger.info("DEBUG: Processing record...") - self.validate_record(record) - primary_key_string = self.record_primary_key_string(record) - if not primary_key_string: - primary_key_string = "RID-{}".format(self._num_records_cached) - # increment row count only when a new PK is encountered in the current batch - if primary_key_string not in self._records_cache: - self._num_records_cached += 1 - self._total_records_read += 1 - else: - self._dupe_records_received += 1 - if self.APPEND_SDC_METADATA_COLS: - record = self.add_metadata_values_to_record(record, message) - self._records_cache[primary_key_string] = record - - @staticmethod - def add_metadata_values_to_record(record: dict, message: dict) -> Dict: - """Populate metadata _sdc columns from incoming record message. - - The location of the required attributes are fixed in the stream. - """ - record["_sdc_extracted_at"] = message.get("time_extracted") - record["_sdc_batched_at"] = datetime.datetime.now().isoformat() - record["_sdc_deleted_at"] = record.get("_sdc_deleted_at") - return record - - def record_primary_key_string(self, record): - """Return a string representing the primary key.""" - if len(self.schema.get("key_properties", [])) == 0: - return None - if self.flattener: - flattened = self.flattener.flatten_record( - record, self.flatten_schema, max_level=self.data_flattening_max_level - ) - try: - key_props = [str(flattened[p]) for p in self.schema["key_properties"]] - except Exception as exc: - self.logger.error( - "Cannot find {} primary key(s) in record: {}".format( - self.schema["key_properties"], flattened - ) - ) - raise exc - return ",".join(key_props) - - def emit_state(self): - """Emit the stream's latest state.""" - if self._flushed_state: - line = json.dumps(self._flushed_state) - self.logger.info(f"Emitting state {line}") - sys.stdout.write(f"{line}\n") - sys.stdout.flush() - - def _get_datelike_property_type( - self, property_key: str, property_schema: Dict - ) -> Optional[str]: - """Return one of 'date-time', 'time', or 'date' if property is date-like. - - Otherwise return None. - """ - if "anyOf" in property_schema: - for type_dict in property_schema["anyOf"]: - if "string" in type_dict["type"] and type_dict.get("format", None) in { - "date-time", - "time", - "date", - }: - return type_dict["format"] - if "string" in property_schema["type"] and property_schema.get( - "format", None - ) in {"date-time", "time", "date"}: - return property_schema["format"] - return None - - def validate_record(self, record: Dict) -> Dict: - """Validate or repair the record.""" - self.validate_timestamps_in_record( - record=record, schema=self.schema, treatment="null" - ) - return record - - def validate_timestamps_in_record( - self, record: Dict, schema: Dict, treatment: str - ) -> None: - """ - Confirm or repair date or timestamp values in record. - - Goes through every field that is of type date/datetime/time and if its value is - out of range, resets it to MAX value accordingly - - Args: - record: record containing properties and values - schema: json schema that has types of each property - """ - # creating this internal function to avoid duplicating code and too many nested blocks. - def reset_new_value(record: Dict, key: str, format: str): - try: - parser.parse(record[key]) - except Exception as ex1: - msg = f"Could not parse value '{record[key]}' for field '{key}'." - if treatment is None or treatment.lower() == "error": - raise ValueError(msg) - if treatment.lower() == "max": - self.logger.warning(f"{msg}. Replacing with MAX value.\n{ex1}\n") - record[key] = ( - self.MAX_TIMESTAMP if format != "time" else self.MAX_TIME - ) - return - if treatment.lower() == "null": - self.logger.warning(f"{msg}. Replacing with NULL.\n{ex1}\n") - return - - for key in record.keys(): - datelike_type = self._get_datelike_property_type( - key, schema["properties"][key] - ) - if datelike_type: - reset_new_value(record, key, datelike_type) - - def flush_all(self) -> dict: - """Call 'flush_records' to save all pending records. - - Returns the latest state. - """ - # NOTE: This may not be thread safe; same stream should not be processed in - # parallel without proper locking. - new_state, records, num_records = ( - self._cache_state, - self._records_cache, - self._num_records_cached, - ) - self._records_cache, self._num_records_cached = {}, 0 - self.flush_records(records.values(), num_records) - self._flushed_state = new_state - self.emit_state() - return self._flushed_state - - # Abstract methods: - - @abc.abstractmethod - def flush_records( - self, records_to_load: Iterable[Dict], expected_row_count: Optional[int] - ) -> dict: - """Abstract method which deals with flushing queued records to the target. - - Returns the latest state. - """ - - @property - def records(self): - raise NotImplementedError("Targets sinks do not support the 'records' property") diff --git a/target-mssql.sh b/target-mssql.sh index eafaf0d..cfeb593 100755 --- a/target-mssql.sh +++ b/target-mssql.sh @@ -1,5 +1,7 @@ #!/bin/sh +unset VIRTUAL_ENV + # This simple script allows you to test your tap from any directory, while still taking # advantage of the poetry-managed virtual environment. # Adapted from: https://github.com/python-poetry/poetry/issues/2179#issuecomment-668815276 @@ -8,5 +10,5 @@ STARTDIR=$(pwd) TOML_DIR=$(dirname "$0") cd "$TOML_DIR" || exit -poetry install +poetry install 1>&2 poetry run target-mssql $* diff --git a/target_mssql/sink.py b/target_mssql/sink.py new file mode 100644 index 0000000..6bf13f2 --- /dev/null +++ b/target_mssql/sink.py @@ -0,0 +1,10 @@ + +from singer_sdk.target_sink_base import TargetSinkBase +from singer_sdk_target import TargetBase + +class MSSSQLTargetSink(TargetSinkBase): + + def flush_records(self, records_to_load: Iterable[Dict], expected_row_count: Optional[int]): + raise NotImplementedError("Not ready ready dude") + + diff --git a/target_mssql/streams.py b/target_mssql/streams.py index e3f7377..be36f71 100644 --- a/target_mssql/streams.py +++ b/target_mssql/streams.py @@ -1,65 +1,125 @@ """Stream class for target-mssql.""" from pathlib import Path from typing import Any, Dict, Optional, Union, List, Iterable -from singer_sdk.streams import Stream -from singer_sdk.typing import ( - ArrayType, - BooleanType, - DateTimeType, - IntegerType, - NumberType, - ObjectType, - PropertiesList, - Property, - StringType, -) - +from singer_sdk.stream import Stream +import pymssql SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") +#TODO: Use logging +#TODO: Opening Database conneciton on a per stream basis seems like a no-go class MSSQLStream(Stream): - """Stream class for MSSQL streams.""" - - def get_records(self, partition: Optional[dict] = None) -> Iterable[dict]: - """Return a generator of row-type dictionary objects. - - The optional `partition` argument is used to identify a specific slice of the - stream if partitioning is required for the stream. Most implementations do not - require partitioning and should ignore the `partitions` argument. - """ - # TODO: Write logic to extract data from the upstream source. - # rows = mysource.getall() - # for row in rows: - # yield row.to_dict() - raise NotImplementedError("The method is not yet implemented (TODO)") + + """Stream class for MSSQL streams.""" + def __init__(self, conn, *args, **kwargs): + super().__init__(*args, **kwargs) + self.conn = conn + #TODO: Turn off autocommit and deal with batching + self.conn.autocommit(True) + #TODO Think about the right way to handle this when restructuring classes re https://pymssql.readthedocs.io/en/stable/pymssql_examples.html#important-note-about-cursors + self.cursor = self.conn.cursor() + self.table_handler() -# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition. -# - Copy-paste as many times as needed to create multiple stream types. -class UsersStream(MSSQLStream): - name = "users" + def table_handler(self): + #TODO it is not safe to assume you can truncate a table in this situation + + #TODO How do we know all of the data is through and we are ready to drop and merge data into the table? + + ddl = self.schema_to_temp_table_ddl(self.schema) + self.sql_runner(ddl) + + #TODO error handling. If there's not a key_propertie what kind of failure do we want? + def schema_to_temp_table_ddl(self, schema) -> str: + #TODO Can't assume this is an INT always + primary_key= self.key_properties[0] + table_name = self.name + columns_types = {} - primary_keys = ["id"] - replication_key = None - # Optionally, you may also use `schema_filepath` in place of `schema`: - # schema_filepath = SCHEMAS_DIR / "users.json" - schema = PropertiesList( - Property("name", StringType), - Property("id", StringType), - Property("age", IntegerType), - Property("email", StringType), - Property("street", StringType), - Property("city", StringType), - Property("state", StringType), - Property("zip", StringType), - ).to_dict() + #TODO Need be using named parameters for SQL to avoid potential injection, and to be clean + #TODO temp needs to be dealth with + #TODO messy + sql = f"DROP TABLE IF EXISTS {table_name}_temp CREATE TABLE {table_name}_temp (" + #TODO can you assume only 1 primary key? + pk_type=self.ddl_json_to_mssqlmapping(self.schema["properties"]["id"]) + pk_type=pk_type.replace("MAX","450") #TODO hacky hacky + sql += f"{primary_key} {pk_type} NOT NULL PRIMARY KEY" + properties=self.schema["properties"] + print(properties) + json_to_column_type={} + properties.pop(primary_key, None) + for name, shape in properties.items(): + mssqltype=self.ddl_json_to_mssqlmapping(shape) + sql+= f", {name} {mssqltype}" + #print(self.json_to_mssqlmapping(shape)) + + #CREATE TABLE {stream_name}_temp ( + # ID int NOT NULL PRIMARY KEY, + # displayName varchar(max), + # } + sql += ");" + return sql + + #TODO clean up / make methods like this static + #TODO what happens with multiple types + #TODO what happens if the string type I want isn't first + def ddl_json_to_mssqlmapping(self, shape:dict) -> str: + jsontype : str = shape["type"][0] + mssqltype : str = None + if (jsontype=="string"): mssqltype = "VARCHAR(MAX)" + elif (jsontype=="number"): mssqltype = "INT" #TODO is int always the right choice? + elif (jsontype=="boolean"): mssqltype = "BIT" + #not tested + elif (jsontype=="null"): raise NotImplementedError + elif (jsontype=="array"): raise NotImplementedError + elif (jsontype=="object"): raise NotImplementedError + else: raise NotImplementedError + + return mssqltype + + def data_json_to_mssqlmapping(self, data) -> str: + if(type(data) == str): returnvalue = f"'{data}'" + #Could have imported NoneType instead but meh + elif(data is None): returnvalue = "NULL" + #TODO clean this up a bit, expressions in python? + elif(type(data) == bool): + if(data): returnvalue = "1" + else: returnvalue = "0" + elif(type(data) == int): returnvalue = f"{data}" + else: raise NotImplementedError(f"Data Type: {data}") + return returnvalue + + #TODO when this is batched how do you make sure the column ordering stays the same (data class probs) + #Columns is seperate due to data not necessairly having all of the correct columns + def record_to_dml(self, table_name:str, data:dict) -> str: + column_list=",".join(data.keys()) + sql = f"INSERT INTO {table_name} ({column_list})" + #TODO can make this more pythonic using lambda, list comprehension, or some collections schtuff + canonical_data = [] + for rec in data.values(): + canonical_data.append(self.data_json_to_mssqlmapping(rec)) + datalist = ",".join(canonical_data) + sql += f" VALUES ({datalist})" + return sql + def sql_runner(self, sql): + print(f"Running SQL: {sql}") + self.cursor.execute(sql) -class GroupsStream(MSSQLStream): - name = "groups" - primary_keys = ["id"] - replication_key = "modified" - schema = PropertiesList( - Property("name", StringType), - Property("id", StringType), - Property("modified", DateTimeType), - ).to_dict() + #def tempdb_to_actualdb_sql(self, temp_db_name, actual_db_name) + #def tempdb_drop_sql(self, tempdb_name) + #def start_transaction(self) -> str: + #def complete_transaction(self) + def persist_record(self, record): + #print(f"would have persisted: {record}") + #print(f"name: {self.name} , key_properties: {self.key_properties}, schema: {self.schema}") + #TODO shouldn't manually generate the table name here + dml = self.record_to_dml(table_name=f"{self.name}_temp",data=record) + #print(dml) + self.sql_runner(dml) + + #def flush_stream(self) + # sql = tempdb_to_actualdb_sql(temp_db_name, actual_db_name) + # sql_runner(sql) + # drop_tempdb = tempdb_drop_sql(temp_db_name) + # sql_runner(drop_tempdb) + # sql_runner(complete_transaction()) diff --git a/target_mssql/target.py b/target_mssql/target.py index 9f1c443..8e30491 100644 --- a/target_mssql/target.py +++ b/target_mssql/target.py @@ -1,57 +1,43 @@ """MSSQL target class.""" - from pathlib import Path from typing import List - -from singer_sdk import Tap, Stream -from singer_sdk.typing import ( - ArrayType, - BooleanType, - DateTimeType, - IntegerType, - NumberType, - ObjectType, - PropertiesList, - Property, - StringType, -) - -# TODO: Import your custom stream types here: -from target_mssql.streams import ( - MSSQLStream, - UsersStream, - GroupsStream, -) - - -# TODO: Compile a list of custom stream types here -# OR rewrite discover_streams() below with your custom logic. -STREAM_TYPES = [ - UsersStream, - GroupsStream, -] - -# TODO: Update from Tap to Target -class TargetMSSQL(Tap): - """MSSQL tap class.""" - - name = "target-mssql" - - # TODO: Update this section with the actual config values you expect: - config_jsonschema = PropertiesList( - Property("host", StringType, required=True), - Property("user", StringType, required=True), - Property("port", IntegerType, required=False, default=1521), - Property("password", StringType, required=True), - Property("database", StringType, required=True), - ).to_dict() - - - def discover_streams(self) -> List[Stream]: - """Return a list of discovered streams.""" - return [stream_class(tap=self) for stream_class in STREAM_TYPES] - - +from singer_sdk.target import Target +from .streams import MSSQLStream +import pymssql + +#STREAM_TYPES = [ +# MSSQLStream, +#] +class TargetMSSQL(Target): + """MSSQL tap class.""" + name = "target-mssql" + + def __init__(self, config, *args, **kwargs): + super().__init__(config, *args, **kwargs) + print(self.config) + assert self.config["host"] + self.conn = pymssql.connect(server=self.config["host"], + user=self.config["user"], + password=self.config["password"], + database=self.config["database"], + port=self.config["port"]) + # TODO: Update this section with the actual config values you expect: + #config_jsonschema = PropertiesList( + # Property("host", StringType, required=True), + # Property("user", StringType, required=True), + # Property("port", IntegerType, required=False, default=1521), + # Property("password", StringType, required=True), + # Property("database", StringType, required=True), + #).to_dict() + + #TODO not a fan of streams not being required by the BaseTarget class here, as it's referenced in the class + def streams(self): + return self.streamslist + #return [stream_class(target=self) for stream_class in STREAM_TYPES] + + #TODO this is a silly way to do this + def streamclass(self, *args, **kwargs): + return MSSQLStream(conn=self.conn, *args, **kwargs) # CLI Execution: -cli = TargetMSSQL.cli +cli = TargetMSSQL.cli() diff --git a/target_mssql/tests/__init__.py b/target_mssql/tests/__init__.py deleted file mode 100644 index b092302..0000000 --- a/target_mssql/tests/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Test suite for target-mssql.""" diff --git a/target_mssql/tests/test_core.py b/target_mssql/tests/test_core.py index 2f468d9..9140166 100644 --- a/target_mssql/tests/test_core.py +++ b/target_mssql/tests/test_core.py @@ -1,8 +1,8 @@ """Tests standard tap features using the built-in SDK tests library.""" import datetime - -from singer_sdk.testing import get_standard_tap_tests +import pytest +#from singer_sdk.testing import get_standard_tap_tests from target_mssql.target import TargetMSSQL #Config from Docker File Setup @@ -17,37 +17,38 @@ # Run standard built-in tap tests from the SDK: -def test_standard_tap_tests(): - """Run standard tap tests from the SDK.""" - tests = get_standard_tap_tests( - TargetMSSQL, - config=SAMPLE_CONFIG - ) - for test in tests: - test() +# def test_standard_tap_tests(): +# """Run standard tap tests from the SDK.""" +# tests = get_standard_tap_tests( +# TargetMSSQL, +# config=SAMPLE_CONFIG +# ) +# for test in tests: +# test() # TODO: State of MSSQL database does need to be handled before going too far here -def testdata_to_mssql(tapdata): - target = TargetMSSQL(config=SAMPLE_TARGET_MSSQL_CONFIG) - target.process_lines(tapdata); #test data +def testdata_to_mssql(source_data): + target = TargetMSSQL(config=SAMPLE_CONFIG) + target.process_messages(source_data); #test data -source_data="""/ +@pytest.fixture +def source_data(): + data = """\ {"type": "SCHEMA", "stream": "employees", "schema": {"type": "object", "properties": {"id": {"type": ["string", "null"]}, "displayName": {"type": ["string", "null"]}, "firstName": {"type": ["string", "null"]}, "lastName": {"type": ["string", "null"]}, "gender": {"type": ["string", "null"]}, "jobTitle": {"type": ["string", "null"]}, "workPhone": {"type": ["string", "null"]}, "workPhoneExtension": {"type": ["string", "null"]}, "skypeUsername": {"type": ["string", "null"]}, "preferredName": {"type": ["string", "null"]}, "mobilePhone": {"type": ["string", "null"]}, "workEmail": {"type": ["string", "null"]}, "department": {"type": ["string", "null"]}, "location": {"type": ["string", "null"]}, "division": {"type": ["string", "null"]}, "linkedIn": {"type": ["string", "null"]}, "photoUploaded": {"type": ["boolean", "null"]}, "photoUrl": {"type": ["string", "null"]}, "canUploadPhoto": {"type": ["number", "null"]}}, "required": []}, "key_properties": ["id"]} -{"type": "RECORD", "stream": "employees", "record": {"id": "119", "displayName": "Alexa Aberdean", "firstName": "Alexa", "lastName": "Aberdean", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": "aaberdean@autoidm.com", "department": "Customer Experience", "location": "Kalamazoo", "division": "abc", "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878203Z"} -{"type": "STATE", "value": {"bookmarks": {"employees": {}}}} -{"type": "RECORD", "stream": "employees", "record": {"id": "120", "displayName": "Lisa Boyer", "firstName": "Lisa", "lastName": "Boyer", "preferredName": null, "jobTitle": "HR Generalist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Human Resources", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878919Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "117", "displayName": "Abby Lamar", "firstName": "Abby", "lastName": "Lamar", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879049Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "111", "displayName": "Patty Mae", "firstName": "Patty", "lastName": "Mae", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "b@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879185Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "122", "displayName": "Mickey Mouse", "firstName": "Mickey", "lastName": "Mouse", "preferredName": null, "jobTitle": "CNC Programmer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879338Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "115", "displayName": "Elon Musky", "firstName": "Elon", "lastName": "Musky", "preferredName": null, "jobTitle": "Process Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879454Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "116", "displayName": "Jeff Razer", "firstName": "Jeff", "lastName": "Razer", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879566Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "113", "displayName": "Tyler Sawyer", "firstName": "Tyler", "lastName": "Sawyer", "preferredName": null, "jobTitle": "Marketing Analyst", "workPhone": "5555555555", "mobilePhone": null, "workEmail": null, "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": "1", "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879670Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "110", "displayName": "Sam Smith", "firstName": "Sam", "lastName": "Smith", "preferredName": null, "jobTitle": "Sales Representative", "workPhone": null, "mobilePhone": "1234567891", "workEmail": "ssmith@autoidm.com", "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879805Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "114", "displayName": "Tim Teebow", "firstName": "Tim", "lastName": "Teebow", "preferredName": null, "jobTitle": "Network Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879902Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "121", "displayName": "Jake Vicks", "firstName": "Jake", "lastName": "Vicks", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879997Z"} -{"type": "STATE", "value": {"bookmarks": {"employees": {}}}} -{"type": "RECORD", "stream": "employees", "record": {"id": "109", "displayName": "Derek Visch", "firstName": "Derek", "lastName": "Visch", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "dvisch@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880110Z"} -{"type": "RECORD", "stream": "employees", "record": {"id": "118", "displayName": "Lexi Weatherton", "firstName": "Lexi", "lastName": "Weatherton", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Manufacturing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880206Z"} -{"type": "STATE", "value": {"bookmarks": {"employees": {}}}} -""" - + {"type": "RECORD", "stream": "employees", "record": {"id": "119", "displayName": "Alexa Aberdean", "firstName": "Alexa", "lastName": "Aberdean", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": "aaberdean@autoidm.com", "department": "Customer Experience", "location": "Kalamazoo", "division": "abc", "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878203Z"} + {"type": "STATE", "value": {"bookmarks": {"employees": {}}}} + {"type": "RECORD", "stream": "employees", "record": {"id": "120", "displayName": "Lisa Boyer", "firstName": "Lisa", "lastName": "Boyer", "preferredName": null, "jobTitle": "HR Generalist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Human Resources", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.878919Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "117", "displayName": "Abby Lamar", "firstName": "Abby", "lastName": "Lamar", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879049Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "111", "displayName": "Patty Mae", "firstName": "Patty", "lastName": "Mae", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "b@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879185Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "122", "displayName": "Mickey Mouse", "firstName": "Mickey", "lastName": "Mouse", "preferredName": null, "jobTitle": "CNC Programmer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879338Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "115", "displayName": "Elon Musky", "firstName": "Elon", "lastName": "Musky", "preferredName": null, "jobTitle": "Process Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879454Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "116", "displayName": "Jeff Razer", "firstName": "Jeff", "lastName": "Razer", "preferredName": null, "jobTitle": "Customer Experience Champion", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Customer Experience", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879566Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "113", "displayName": "Tyler Sawyer", "firstName": "Tyler", "lastName": "Sawyer", "preferredName": null, "jobTitle": "Marketing Analyst", "workPhone": "5555555555", "mobilePhone": null, "workEmail": null, "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": "1", "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879670Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "110", "displayName": "Sam Smith", "firstName": "Sam", "lastName": "Smith", "preferredName": null, "jobTitle": "Sales Representative", "workPhone": null, "mobilePhone": "1234567891", "workEmail": "ssmith@autoidm.com", "department": "Sales/Marketing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879805Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "114", "displayName": "Tim Teebow", "firstName": "Tim", "lastName": "Teebow", "preferredName": null, "jobTitle": "Network Engineer", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879902Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "121", "displayName": "Jake Vicks", "firstName": "Jake", "lastName": "Vicks", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Engineering", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.879997Z"} + {"type": "STATE", "value": {"bookmarks": {"employees": {}}}} + {"type": "RECORD", "stream": "employees", "record": {"id": "109", "displayName": "Derek Visch", "firstName": "Derek", "lastName": "Visch", "preferredName": null, "jobTitle": "Executive", "workPhone": null, "mobilePhone": null, "workEmail": "dvisch@autoidm.com", "department": "Leadership", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880110Z"} + {"type": "RECORD", "stream": "employees", "record": {"id": "118", "displayName": "Lexi Weatherton", "firstName": "Lexi", "lastName": "Weatherton", "preferredName": null, "jobTitle": "CNC Machinist", "workPhone": null, "mobilePhone": null, "workEmail": null, "department": "Manufacturing", "location": "Kalamazoo", "division": null, "linkedIn": null, "workPhoneExtension": null, "photoUploaded": false, "photoUrl": "https://resources.bamboohr.com/images/photo_person_150x150.png", "canUploadPhoto": 1}, "time_extracted": "2021-04-15T19:16:24.880206Z"} + {"type": "STATE", "value": {"bookmarks": {"employees": {}}}}""" + return data