diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..66dbf51 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.egg-info +*.pyc diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..3701087 --- /dev/null +++ b/COPYING @@ -0,0 +1,14 @@ +Copyright 2017, California Institute of Technology. + ALL RIGHTS RESERVED. + U.S. Government Sponsorship acknowledged. + +Any commercial use must be negotiated with the Office of Technology +Transfer t the California Institute of Technology. + +This software may be subject to U.S. export control laws and +regulations. By accepting this document, the user agrees to comply +with all applicable U.S. export laws and regulations. + +User has the responsibility to obtain export licenses, or other export +authority as may be required before exporting such information to +foreign countries or providing access to foreign persons. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0cbb939 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017 California Institute of Technology. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f3af7c3 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +Object-Store Abstraction ? Architecture (Osaka) +=============================================== + +Osaka is an object storage abstraction system allowing the user to push generic file object to various backend storage system. These different backends are selected based on the scheme element of the URI for the file. Currently, Osaka must have one endpoin exist on the local file system. + +| Schemes | Push Implemented | Pull Implemented | Tested | Notes: | +| ------- | ---------------- | ---------------- |--------|---------------------------------| +| file://,none | Yes | Yes | Yes | | +| s3://,s3s:// | Yes | Yes | Yes | | +| dav://,davs:// | Yes | Yes | Yes | | +| http://,https://| Yes | Yes | Yes | | +| azure://,azures:// | No | No | No | Needs reimplmenetation, no dev azure instance exists | +| ftp://,sftp:// | No | No | No | Needs reimplmentation | +| rsync:// | No | No |No | Not a backend, but a transfer mechanism | + + diff --git a/archived_history.txt b/archived_history.txt new file mode 100644 index 0000000..4294c99 --- /dev/null +++ b/archived_history.txt @@ -0,0 +1,748 @@ +commit 6065bfae6925eadb041f89ac227eaee740f4ffa9 +Merge: ea4f0f4 4a3caa8 +Author: Gerald Manipon +Date: Tue Oct 3 10:11:22 2017 -0700 + + Merge pull request #3 from hysds-org/mstarch-ftp + + mstarch: Adding in FTP pull + +commit 4a3caa8f2cba12be63ff462ba90f2a42c8d449d2 +Author: M Starch +Date: Mon Sep 11 23:21:12 2017 +0000 + + mstarch: better lock-file test, better FTP + +commit 27810e2fa0dee1c59a5c001ad57f59bcc33c648b +Author: M Starch +Date: Mon Sep 11 19:17:19 2017 +0000 + + mstarch: better lock file naming, listing, and ftp handler + +commit ea4f0f43e847de552bc4dfa2b1358fb744421dfd +Author: gmanipon +Date: Thu Aug 24 18:17:12 2017 +0000 + + append file handle after being defined + + https://github.jpl.nasa.gov/hysds-org/osaka/issues/1 + +commit 2fa7c10cf1d1c474e957e901d00c2fa8cb6e9ec8 +Author: M Starch +Date: Thu Aug 17 16:38:10 2017 +0000 + + mstarch: fixing no-clobber flag in put, and syntax error + +commit 6303415e65a354b33dbd4c6519d188185281a2b8 +Merge: b3fb406 1b68906 +Author: M Starch +Date: Thu Aug 17 16:32:08 2017 +0000 + + Merge branch 'grfn' of https://github.jpl.nasa.gov/hysds-org/osaka-object-store-abstraction + +commit b3fb406001a28675d85524187328c76c145a9e22 +Author: M Starch +Date: Thu Aug 17 16:31:56 2017 +0000 + + mstarch: more robust file removal + +commit 1b689060954050f91c84f6247f5392bc5ea19fd6 +Author: M Starch +Date: Thu Aug 17 16:31:31 2017 +0000 + + mstarch: fixing osaka-s3 temp file removal + +commit 82f8183e12aad82f9c509501aa4f57f5b8a72d60 +Author: M Starch +Date: Fri Aug 11 23:23:30 2017 +0000 + + mstarch: basic no clobber + +commit 3a8aad9223d7329a6a885ffcabe0fb2332157619 +Author: M Starch +Date: Wed Aug 2 16:24:52 2017 +0000 + + mstarch: fixing S3 unit tests + +commit 5e9e728c33146d13418a73fbb1ea81ec76cc5a8b +Author: M Starch +Date: Tue Aug 1 19:58:07 2017 +0000 + + mstarch: fixing unit test test-file copy + +commit 185bd0180ab6b11156f5262e106475cf6235561a +Author: M Starch +Date: Tue Aug 1 19:49:20 2017 +0000 + + mstarch: fixing bug in s3 cache + +commit c4cbcd3b25194183cccd93238232683404ffebd4 +Author: M Starch +Date: Sat Jul 29 00:21:03 2017 +0000 + + mstarch: adding in gs size + +commit dc3374a951eebd2408855e239efc63e57f2d45f4 +Author: M Starch +Date: Sat Jul 29 00:16:21 2017 +0000 + + mstarch: fixing tests and adding in size + +commit 90a4f615275c1a38dea870e6a8dc8ea6a9a0f17d +Author: M Starch +Date: Fri Jul 28 22:43:52 2017 +0000 + + mstarch: fixed test assert + +commit 84048cb6af5d066ce8a22f37cd4f14286727f3bb +Author: M Starch +Date: Fri Jul 28 22:43:25 2017 +0000 + + mstarch: better test name directory + +commit 295a9a6056b5a14b32b570dd82caa979d8db1d79 +Author: M Starch +Date: Fri Jul 28 22:42:58 2017 +0000 + + mstarch: tests with better names + +commit cf7ca28e5d415f4c1c3a2fea35b42e003f67237b +Author: M Starch +Date: Fri Jul 28 21:47:00 2017 +0000 + + mstarch: adding in cooperation and test + +commit 43fef66a7fe9b435cb334a3f12be262ca0e422ba +Author: M Starch +Date: Mon Jul 24 22:57:43 2017 +0000 + + mstarch: fixing wacky urls for https://github.jpl.nasa.gov/hysds-org/general/issues/410 + +commit 51dee49aac8e2da4857d09dfbf1467668ccf5186 +Author: M Starch +Date: Mon Jul 24 22:57:03 2017 +0000 + + mstarch: updated unit tests to be configurable + +commit 6ca93517c67f2579c91245c6b5ee2780ab7fb178 +Merge: 8602125 7d986a6 +Author: gmanipon +Date: Fri Jul 14 11:51:00 2017 -0700 + + Merge branch 'master' into oco2-fixes + +commit 7d986a65fd43cb5318cdb278fd9c13c8f6efc631 +Author: M Starch +Date: Fri Jul 14 00:25:26 2017 +0000 + + mstarch: fixing port masking for 443 as another standard port + +commit f6bffe05fa9bc82d370abdf81609213a7cbb7c29 +Author: M Starch +Date: Fri Jul 14 00:20:23 2017 +0000 + + mstarch: filtering our port 80 + +commit 86021256d34cbf64c2a947652b1805d793e9cde1 +Author: M Starch +Date: Wed Jun 7 09:25:05 2017 -0700 + + mstarch: fixing s3 download slowness and disconnects + +commit 595414327dacad4579707224329806a83a0a4175 +Author: M Starch +Date: Mon Mar 6 17:09:58 2017 +0000 + + mstarch: fixing timeout issue + +commit 0015823774b9a2663d7e1b9d21848258e5199b21 +Author: M Starch +Date: Mon Feb 6 19:17:23 2017 +0000 + + mstarch: osaka supports S3 encryption on a key-level + +commit dac755b7b698bc23db86d3cd1e8a72bda0f6c807 +Merge: 7052d3f d8f7197 +Author: M Starch +Date: Mon Feb 6 18:53:14 2017 +0000 + + Merge branch 'master' of https://github.jpl.nasa.gov/hysds-org/osaka-object-store-abstraction + +commit d8f7197e618eb8c23c9582404880faf79ac82ea3 +Author: gmanipon +Date: Mon Jan 30 22:18:46 2017 +0000 + + use explicitly passed profile_name for AWS creds + +commit 7052d3fec897a5eb65fb06222e241e008957d1fc +Merge: a952b77 861dbab +Author: M Starch +Date: Thu Jan 12 21:08:22 2017 +0000 + + mstarch: merging GS and CLI support + +commit a952b77f60690c5c1eb50da100b506ce45b063be +Author: M Starch +Date: Thu Jan 12 21:04:41 2017 +0000 + + mstarch: removing faux support for HTTP rm + +commit be512b1354f89c179e9aea39552f18ef54a6e45c +Author: M Starch +Date: Thu Jan 12 01:03:56 2017 +0000 + + mstarch: constraining file:// uris + +commit 861dbab7382e5fc84f63d84d43ab435f59e1a92b +Author: gmanipon +Date: Tue Dec 27 22:40:02 2016 +0000 + + set correct protobuf version + +commit d3f3d1a8dfd38d4efad9424cf0775c9f9fa915fd +Author: gmanipon +Date: Tue Dec 27 21:52:53 2016 +0000 + + add support for google storage + +commit 440173bd1ba9cb54befb45153dc0056ecb8e03a9 +Author: M Starch +Date: Thu Dec 15 21:35:48 2016 +0000 + + mstarch: adding CLI, prevented logging-by-force, and added force parameter to get,transfer + +commit c5a45639c5a8fe2ace32f2e450f3a3a05ad311f7 +Author: M Starch +Date: Thu Dec 8 10:30:41 2016 -0800 + + mstarch: fixing S3 to use AWS profiles named after buckets + +commit 44374675cc07b119da085fe88a2a582350c5ece6 +Author: Michael Starch +Date: Mon Oct 31 23:16:13 2016 +0000 + + mstarch: untested retries functionality + +commit eb8b68365567dcd95c2ee3f8bb427d7bb350e37a +Author: M Starch +Date: Mon Oct 31 13:23:19 2016 -0700 + + mstarch: adding retries to Osaka as well as lock exposure + +commit 32c95812a5323eda37a253b28073b5f710442c1a +Author: Michael Starch +Date: Mon Oct 31 16:24:10 2016 +0000 + + mstarch: adding timeout unit tests + +commit 687b90e5b561821eb0ddf9c0d122ff26ef603404 +Author: Michael Starch +Date: Mon Oct 31 15:36:51 2016 +0000 + + mstarch: adding back in unit test stuff + +commit b437c5a89c0b5b4924f408e02a700e32f0f040df +Author: Michael Starch +Date: Thu Oct 27 01:29:20 2016 +0000 + + mstarch: working osaka webdav + +commit 349132b66b491b6373419f0b81f4ea8b7e7f9658 +Merge: 4df449a 51e2f57 +Author: Michael Starch +Date: Wed Oct 26 20:30:36 2016 +0000 + + Merge branch 'mstarch-dev-timeouts' of https://github.jpl.nasa.gov/hysds-org/osaka-object-store-abstraction into mstarch-dev-timeouts + +commit 4df449acc6d9fee9b84f744870ff3477581f578e +Author: Michael Starch +Date: Wed Oct 26 20:07:26 2016 +0000 + + mstarch: Oskak improvments + +commit 51e2f5797835c702dbe43d47c750b233b1796761 +Author: Michael Starch +Date: Tue Sep 13 18:08:25 2016 +0000 + + mstarch: fixing 2 minor osaka bugs + +commit 497f49709af9cb2a26fd130bb854f243856e945b +Author: M Starch +Date: Mon Sep 12 10:23:14 2016 -0700 + + mstarch: better tests + +commit 54330189be615df57a6e46aafd3662450a22e975 +Author: M Starch +Date: Tue Aug 30 09:28:57 2016 -0700 + + mstarch: splitting HTTP and WebDAV, fixing error 301 + +commit b7b0ec51c9158cb9f096c97940b9fed739614e87 +Author: Michael Starch +Date: Wed Aug 17 21:05:28 2016 +0000 + + mstarch: fixing timeout 'self' issue + +commit 302745769b2ba9d0d97735b38a441226d02cc97c +Author: M Starch +Date: Mon Jun 20 18:09:49 2016 +0000 + + mstarch: renamed parameter to watchdog + +commit e235a7126e509caf8aa1e9772b4e337f688eb7e3 +Author: M Starch +Date: Mon Jun 20 17:58:07 2016 +0000 + + mstarch: adding timeouts to requests calls + +commit d865e3172193bd051659f1daa1e285fc1f4e3261 +Author: M Starch +Date: Wed Jun 15 17:47:47 2016 +0000 + + mstarch: adding cleaner error message + +commit 9c197b15488ce8e906c84b965845416814f66deb +Author: M Starch +Date: Mon May 9 15:11:37 2016 -0700 + + mstarch: fixing authentication on http + +commit d2abb52fe98ec5a0e9220fef730af9abe40f9e3e +Author: M Starch +Date: Thu May 5 21:02:26 2016 +0000 + + mstarch: more logging, fixed ASF existence check + +commit b8769443e835877d16994143a4189e0e8a2bf675 +Author: M Starch +Date: Thu May 5 19:55:27 2016 +0000 + + mstarch: tested Osaka (minus OAuth) + +commit 899a578ffdf1824df072fbfde5c825809c8d7c90 +Author: M Starch +Date: Thu May 5 17:15:18 2016 +0000 + + mstarch: fixing minor http/dav issues with nonstandard http/dav services + +commit 0d31f50b9785c7ab21d2fe743dffafa2c68d80cc +Author: M Starch +Date: Wed May 4 19:50:04 2016 -0700 + + mstarch: rewrite of osaka codebase including point-to-point, file interlocks + +commit 02e5aa76cff81ec392e762c2bb4d35d20919db67 +Author: gmanipon +Date: Tue Mar 29 20:05:08 2016 +0000 + + recursive-http bug fix - check if directory needs to be made + +commit 2ca56fc1187080cc802ded410f482d2a5bcc215d +Merge: 73e6ab1 e10e592 +Author: gmanipon +Date: Tue Mar 29 16:42:53 2016 +0000 + + Merge branch 'mstarch-sftp' into dev + +commit 73e6ab1e8a4f6c3342098b3231683fad7c96c06e +Author: gmanipon +Date: Tue Mar 29 16:23:23 2016 +0000 + + create region-endpoint dict once + +commit 0c85872acff2c75d6ac8523c1934b459984d515d +Author: gmanipon +Date: Tue Mar 29 15:57:51 2016 +0000 + + create s3 session for region automatically; fix bug in downloading of single s3 file and entire directory + +commit de5695c3b5497d9df40925b87ffe6ade921e8512 +Merge: 3c01df0 634c779 +Author: gmanipon +Date: Tue Mar 29 14:55:15 2016 +0000 + + Merge branch 'recursive-http' into dev + +commit 3c01df0fffb0de75c8ea3527cd710ffa2aecd296 +Author: gmanipon +Date: Mon Mar 28 20:46:51 2016 +0000 + + detect endpoint url using botocore + +commit e10e592604571d0c832875be9612db2ae68cbf51 +Author: gmanipon +Date: Fri Mar 25 01:19:22 2016 +0000 + + mstarch: properly handles single files + +commit c94d9c4d4d239a752da033874f064763cb6290ad +Author: gmanipon +Date: Thu Mar 24 23:58:59 2016 +0000 + + mstarch: osaka now supports SFTP put + +commit a2fa1de8cefd9c1e1873830145fa26b90e1424d9 +Author: M Starch +Date: Thu Mar 24 15:40:36 2016 -0700 + + mstarch: osaka SFTP solution + +commit e3fd161d05bb7adfafa39823514e20c88d6e1cfa +Author: gmanipon +Date: Tue Mar 15 15:55:22 2016 +0000 + + overhaul s3 handler to use boto3; remove gof3r since boto3 supports multipart uploads/downloads + + Boto3 fixes an issue with multipart upload/download in boto. + +commit 634c779256e7a27179682602989693e9e21656de +Author: M Starch +Date: Thu Mar 10 17:12:12 2016 -0800 + + mstarch: fixing put folder problem + +commit 35bbd55492d61bd7148ee8ffbfedf808ee7335eb +Author: M Starch +Date: Thu Mar 10 17:00:53 2016 -0800 + + mstarch: fixing several edge cases + +commit 4263a133f4446454370d9f6fc2cf6c4019bfc759 +Author: M Starch +Date: Thu Mar 10 16:46:31 2016 -0800 + + mstarch: adding more children abstractions + +commit 546bb259f4a693dcbb804421b4c04fefe358e7e9 +Author: M Starch +Date: Thu Mar 10 16:44:56 2016 -0800 + + mstarch: adding more children abstractions + +commit 6d5bff4656e2e8a7a7d98d6c155cc9b998895113 +Author: M Starch +Date: Thu Mar 10 16:35:59 2016 -0800 + + mstarch: fixing auth + +commit 8db61fdf9570aef15de1ccdcb34b3bfc529f700c +Author: M Starch +Date: Thu Mar 10 16:25:24 2016 -0800 + + mstarch: properly handles empty uploads + +commit 8d8e61e9310418b9ca393af9b99317d1f31543d2 +Author: M Starch +Date: Thu Mar 10 16:20:56 2016 -0800 + + mstarch: adding in recursive upload + +commit 3adaf8e806f43b3bd99790541c5dc04c6f05ccec +Author: M Starch +Date: Thu Mar 10 16:14:22 2016 -0800 + + mstarch: adding in recursive upload + +commit bbb5fad77e89396cf1d5f1ece965083c026371f7 +Author: M Starch +Date: Thu Mar 10 16:02:56 2016 -0800 + + mstarch: minor bug fixes + +commit 52d634325f33a04bb8b1d183916827aeb08b0659 +Author: M Starch +Date: Thu Mar 10 16:01:06 2016 -0800 + + mstarch: minor bug fixes + +commit 4df40fc66702ac964fe8b73cf6a9d3bddeb00056 +Author: M Starch +Date: Thu Mar 10 15:58:16 2016 -0800 + + mstarch: minor bug fixes + +commit fb87466cb53d21efb928d23e8df08fd577712e11 +Author: M Starch +Date: Thu Mar 10 15:23:11 2016 -0800 + + mstarch: minor bug fixes + +commit dc63fd98201bade722da4db0918b66badaf62dad +Author: M Starch +Date: Thu Mar 10 15:14:48 2016 -0800 + + mstarch: minor bug fixes + +commit aff1167ddcb4a095c5dbf9bf47f06eda7779d785 +Author: M Starch +Date: Thu Mar 10 15:11:45 2016 -0800 + + mstarch: fixed empty file, folder exists, and append name problem + +commit 2c651d421f389002bf1f72048e5b622f65dd4e8d +Author: M Starch +Date: Thu Mar 10 14:55:34 2016 -0800 + + mstarch: minor bug fixes + +commit 2085452853c998d6b63877af92a4e89c49460f57 +Author: M Starch +Date: Thu Mar 10 14:48:04 2016 -0800 + + mstarch: minor bug fixes + +commit b436ddc8f1d879af655bb758703a56591b904ff5 +Author: M Starch +Date: Thu Mar 10 14:45:01 2016 -0800 + + mstarch: minor bug fixes + +commit 429d0bd11380909e0d90af987d4f411b483685f5 +Author: M Starch +Date: Thu Mar 10 14:38:15 2016 -0800 + + mstarch: minor bug fixes + +commit 6c6d00d1c511fef2f27a8cabae06bf8e3788411d +Author: M Starch +Date: Thu Mar 10 14:29:22 2016 -0800 + + mstarch: minor bug fixes + +commit b1174f937da1b57137af784a684f5fc848813f3e +Author: M Starch +Date: Thu Mar 10 14:28:11 2016 -0800 + + mstarch: minor bug fixes + +commit 984a09fd3804ba3f907517fc782832880ffc0b12 +Author: M Starch +Date: Thu Mar 10 14:26:59 2016 -0800 + + mstarch: minor bug fixes + +commit 4b11accaafa0f239578bc5be065c254a52450eb8 +Author: M Starch +Date: Thu Mar 10 14:24:57 2016 -0800 + + mstarch: now attempts to parse http response + +commit f13f7d9f7659432ea7494dff51e7a94d3b416896 +Author: M Starch +Date: Thu Mar 10 14:02:29 2016 -0800 + + mstarch: minor bug fixes + +commit e57fd309309d886ae2954100000c6f8acddc5768 +Author: M Starch +Date: Thu Mar 10 13:57:34 2016 -0800 + + mstarch: minor bug fixes + +commit e35305f0235196030ae459a1f631f6dcb6b0e47b +Author: M Starch +Date: Thu Mar 10 13:55:18 2016 -0800 + + mstarch: changed to using classmethods + +commit aba38158f9440b7e07d078752f46da39a453977d +Author: M Starch +Date: Thu Mar 10 13:47:59 2016 -0800 + + mstarch: partially working recursive http GETs + +commit 880405d87165e832469a3d387d23fa6383e26f93 +Author: M Starch +Date: Thu Mar 10 10:26:49 2016 -0800 + + mstarch: rolling back osaka changes + +commit c06f7f82fed357d15f96994326514e357dcd6b27 +Author: M Starch +Date: Wed Mar 9 22:19:20 2016 +0000 + + mstarch: recursive webdav pulls + +commit 7c9fec72ffbef3e5278c26970b2ea9a12ee00e42 +Merge: b4354a9 1f078e8 +Author: M Starch +Date: Mon Dec 7 10:01:37 2015 -0800 + + Merge branch 'master' of github.jpl.nasa.gov:hysds-org/osaka-object-store-abstraction + +commit b4354a99970f4a34247073b31863ab400cf2792a +Author: M Starch +Date: Mon Dec 7 10:01:28 2015 -0800 + + mstarch: fixing log message + +commit 1f078e86bc19b7a310bfa39b82d136f583ff8971 +Author: gmanipon +Date: Sat Dec 5 02:39:47 2015 +0000 + + fix bug introduced by commit 14690639 + + A path in the current working directory without "./" causes an exception. + +commit 46c56907bda05c76f90b716988604a3b7b9e6fbd +Author: gmanipon +Date: Sat Dec 5 02:01:02 2015 +0000 + + add support for digest authentication + +commit 3d6c320c55195a5f512814db792ea44ac32f7fd5 +Author: gmanipon +Date: Thu Dec 3 16:05:38 2015 +0000 + + generalize osaka metrics + +commit e18a59a7f5917f4003b46ddbf6755a5bcd4a9caa +Author: M Starch +Date: Thu Nov 19 11:52:31 2015 -0800 + + mstarch: Oskaka now records metrics if requested + +commit a6c8a2fea03de8826268287c70df175f1142dfa8 +Author: M Starch +Date: Wed Nov 18 19:24:59 2015 -0800 + + mstarch: updating variable names and adding tests + +commit 14690639051a5a5d37a0880db2195a1b319e203d +Author: M Starch +Date: Wed Nov 18 19:21:14 2015 -0800 + + mstarch: osaka now creats downalod dir for http + +commit f46b4640e8f6773ce65b9094e5c6291129e461bf +Author: gmanipon +Date: Mon Oct 26 20:16:57 2015 +0000 + + remove normalization; fixed in hysds.product_ingest + +commit 2eb0705a1d67aa44dadaba8121940d99bcdc7627 +Author: gmanipon +Date: Mon Oct 26 16:10:31 2015 +0000 + + normalize urls otherwise some backends will generate unexpected results + +commit fc70c4d35a648addb4520eaef2eefb45389e2c9d +Author: gmanipon +Date: Mon Oct 26 15:08:33 2015 +0000 + + fix bug in handling US Standard + +commit 470e7fc2959428555ded5af407e7198d6fc549a1 +Author: gmanipon +Date: Sat Oct 24 18:04:49 2015 +0000 + + fix bug in oauth authentication + +commit ad4adb3c88e991cf2fbdb82debf81bd695edb547 +Author: gmanipon +Date: Wed Oct 14 17:24:15 2015 +0000 + + add .gitignore + +commit 69149aa5c9238c5971404a273bb29308cd5ef240 +Author: M Starch +Date: Tue Oct 13 13:46:28 2015 -0700 + + mstarch: working azure + +commit 6c515a31abee91e3b7363754fcdff6b4a3afb6b9 +Author: M Starch +Date: Tue Oct 13 12:44:42 2015 -0700 + + mstarch: fixes region override + +commit 8f7a0ad6452e23df02377f83855a103ff1d5aefd +Author: M Starch +Date: Tue Oct 13 12:37:59 2015 -0700 + + mstarch: fixed s3 region specification and tenative azure support + +commit f1a47b3939f4261e78e492ee1e2b3b3f3212fe32 +Author: Gerald Manipon +Date: Fri Oct 9 00:03:53 2015 +0000 + + mstarch: tested s3, file, and http + +commit 60cb82041f6acc63cacdd15628646a1303d240ce +Author: Gerald Manipon +Date: Thu Oct 8 15:52:48 2015 +0000 + + mstarch: fixes to http + +commit d53ac6f05c847af47cca75a658dc00745c19ea3c +Author: Gerald Manipon +Date: Tue Oct 6 23:34:15 2015 +0000 + + mstarch: fixing import + +commit b1a991138a2bf30c3dfb4865e4acf1f2268aa9f9 +Author: Gerald Manipon +Date: Tue Oct 6 22:53:45 2015 +0000 + + mstarch: 'supported' functionality + +commit 0151b059eb61163bfdac9b1162bfcd60b0944ee0 +Author: Gerald Manipon +Date: Tue Oct 6 22:24:52 2015 +0000 + + mstarch: adding requests import + +commit 12c8b8c9966e71cd57d11e7ef75c185f59727979 +Author: Gerald Manipon +Date: Tue Oct 6 22:12:16 2015 +0000 + + mstarch: proper close if oauth is none + +commit 1ec6ce0d9dde6704ffc0276e5fa4aebe7f86ba9a +Author: Gerald Manipon +Date: Tue Oct 6 22:00:40 2015 +0000 + + mstarch: fixing http osaka module + +commit 91b13ec99a2d7915c9da67997d32be36ef5ce957 +Author: Gerald Manipon +Date: Tue Oct 6 21:38:41 2015 +0000 + + mstarch: now using class methods to prevent the need for may objects (and debug messages) + +commit 1de365551b89b098d82cbe273e5666e46f944217 +Author: Gerald Manipon +Date: Tue Oct 6 20:58:29 2015 +0000 + + mstarch: properly passing parameters + +commit 462d2735d442f5f07a93d69afd0af3859231d5f1 +Author: Gerald Manipon +Date: Tue Oct 6 20:34:16 2015 +0000 + + mstarch: more osaka changes + +commit 418acd56cfa389766c75b8e3a45478db35034073 +Author: Gerald Manipon +Date: Tue Oct 6 20:14:12 2015 +0000 + + mstarch: updates to disabled azure + +commit daa03c759c73bd18972e380dfc44ac5fc1ccdeef +Author: Gerald Manipon +Date: Tue Oct 6 20:13:23 2015 +0000 + + mstarch: updating osaka with tests, azure disabled + +commit 977d3f453b3786b9accdf2eaf300934fa4f72aa7 +Author: Gerald Manipon +Date: Fri Sep 18 00:27:06 2015 +0000 + + mstarch: properly passing params + +commit 28b16dccaf664738b513bf6f3e3ef4760f642a11 +Author: M Starch +Date: Thu Sep 17 09:39:05 2015 -0700 + + mstarch: initial implementation of the Osaka Object Store Framework diff --git a/osaka/__init__.py b/osaka/__init__.py new file mode 100644 index 0000000..0b0bf76 --- /dev/null +++ b/osaka/__init__.py @@ -0,0 +1,3 @@ +__version__ = "0.0.1" +__url__ = "http://gitlab.jpl.nasa.gov:8000/browser/trunk/HySDS/hysds" +__description__ = "Osaka (Object Store Abstraction K Arcitecture)" diff --git a/osaka/__main__.py b/osaka/__main__.py new file mode 100644 index 0000000..7346654 --- /dev/null +++ b/osaka/__main__.py @@ -0,0 +1,188 @@ +from __future__ import print_function +import argparse +import functools +import logging +import traceback +import osaka.main +import osaka.utils + +def argparse_exception_wrapper(function, arg): + ''' + A function wrapper that converts exceptions into an arg-parse type exception + @param function - function to call (typically bound using functools.partial) + @param arg - argument to check + ''' + try: + function(arg) + except Exception as e: + raise argparse.ArgumentTypeError(str(e)) + return arg + +def get_main_parser(): + ''' Gets the top-level parser ''' + parser = argparse.ArgumentParser(prog="osaka", description="Mosura-CLI for Osaka") + parser.add_argument("-v", "--verbose", help="Enables verbose output",action="store_true",default=False) + parser.add_argument("-d", "--debug", help="Enables debug output",action="store_true",default=False) + subparsers = parser.add_subparsers(help='sub-command help',dest="cmd") + add_get_parser(subparsers) + add_put_parser(subparsers) + add_rm_parser(subparsers) + add_size_parser(subparsers) + add_list_parser(subparsers) + parser_ex = subparsers.add_parser("exists", description = "Check the existence of the specified source Osaka-URL", + help="'exists' sub-command help") + parser_ex.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to remove") + parser_ex = subparsers.add_parser("gojira", description = "RRRRRRAAAAAAAWWW...", help="RRRRRRAAAAAAAWWW...") + return parser +def add_get_parser(subparsers): + ''' Osaka GET command line ''' + parser_get = subparsers.add_parser("get", description = "Retrieves the specified Osaka-URL to specified destination", + help="'get' sub-command help") + parser_get.add_argument("-f", "--force", help="Forces a retrieval of the source Osaka-URL",action="store_true",default=False) + parser_get.add_argument("-x", "--no-coop", help="Refuses to cooperate with other osaka processes",action="store_true",default=False, dest="ncoop") + parser_get.add_argument("-n", "--no-clobber", help="Refuses to clobber existing destinations",action="store_true",default=False, dest="noclobber") + parser_get.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to get") + parser_get.add_argument("destination", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + nargs="?", + default="./", + help="The destination Osaka-URL to place result. Defaults to CWD") +def add_put_parser(subparsers): + ''' Osaka PUT command line ''' + parser_put = subparsers.add_parser("put", description = "Puts the specified source Osaka-URL to specified destination Osaka-URL", + help="'put' sub-command help") + parser_put.add_argument("-x", "--no-coop", help="Refuses to cooperate with other osaka processes",action="store_true",default=False, dest="ncoop") + parser_put.add_argument("-n", "--no-clobber", help="Refuses to clobber existing destinations",action="store_true",default=False, dest="noclobber") + parser_put.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to put") + parser_put.add_argument("destination", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The destination Osaka-URL to place result") + +def add_rm_parser(subparsers): + ''' Osaka RM command line ''' + parser_rm = subparsers.add_parser("rm", description = "Remove the specified source Osaka-URL", + help="'rm' sub-command help") + parser_rm.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to remove") + parser_rm.add_argument("-f","--force", dest="unlock", help="Force unlock before removal",action='store_true',default=False) + +def add_size_parser(subparsers): + ''' Osaka size command line ''' + parser_size = subparsers.add_parser("size", description = "Returns the size of the specified source Osaka-URL", + help="'size' sub-command help") + parser_size.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to size") + parser_size.add_argument("-f","--force", dest="force", help="Force sizing for locked urls",action='store_true',default=False) + +def add_list_parser(subparsers): + ''' Osaka size command line ''' + parser_ls = subparsers.add_parser("list", description = "Returns the recursive listing of the specified source Osaka-URL", + help="'list' sub-command help") + parser_ls.add_argument("source", type=functools.partial(argparse_exception_wrapper,osaka.base.StorageBase.getStorageBackend), + help="The source Osaka-URL to list children") + parser_ls.add_argument("-f","--force", dest="force", help="Force sizing for locked urls",action='store_true',default=False) + +def main(args=None): + ''' + Entry-point for the main osaka-cli + ''' + parsed = get_main_parser().parse_args(args) + #Setup logging levels + if parsed.debug: + logging.basicConfig(level=logging.DEBUG) + elif parsed.verbose: + logging.basicConfig(level=logging.INFO) + else: + logging.basicConfig(level=logging.WARNING) + #Rename 'rm' command + parsed.cmd = "rmall" if parsed.cmd == "rm" else parsed.cmd + params = [] + for item in ["source", "destination"]: + if item in parsed: + params.append(getattr(parsed,item)) + kwargs = {} + for item in ["unlock","force", "ncoop", "noclobber"]: + if item in parsed: + kwargs[item] = getattr(parsed,item) + if parsed.cmd == "gojira": + gojira() + return 0 + try: + fn = getattr(osaka.main,parsed.cmd) + ret = fn(*params,**kwargs) + if parsed.cmd == "exists": + text = "Found" if ret else "Not Found" + print("{0}: {1}".format(text,parsed.source)) + elif parsed.cmd == "size": + sz, lb = osaka.utils.human_size(ret) + print("Size {0} {1}: {2}".format(sz, lb, parsed.source)) + elif parsed.cmd == "list": + print("Listing for: {0}".format(parsed.source)) + for listing in ret: + print(" {0}".format(listing)) + except Exception as e: + print("[ERROR] An exception of type {0} occured with message '{1}'".format(type(e).__name__,e)) + if parsed.debug: + traceback.print_exc() + return -1 + return 0 +def gojira(): + ''' + A Gojira egg + ''' + moth = ''' + '####++###+++++++++++++++++#############+++++++++++++++++++#################################+' + '############+++++++++##############++++++++++++++++++++####################################+' + +#####++######################+++++++++++###################################################+ + ++################++############+++++++++++++##############################################+ + '++++++++++++++####################++++++++++++++++######################################+ ' + '++++++++++''+++++++++++++++#########+++++++++++++++++###############################+' ``````'++++++++++```+++++' + '########++++++++++++++++++++++##+++++++++++++++++++++++++++++++++##############+'' ```++++##++#########+++++++++' + ```'' '+#++##+++#########++####+++++++++++++++++++++++++++++++++++++++++#########+'' ''+++++++###+################++++++' + '++'++'' '+#++++++#+++######++++##+######+++++++++++++++++++++++++++++++++++####' ''+++++++#########################+++++++'' + +++``````'' +++++++++++++++++++++++++++++++++++++++++++######+#####++++++++++++```+++#+++#######+#####################+++++++' + '+```;```;;''; '++++++''++++++++++++++++++######++++#+++++#++###+####++++++++++++########++##++++######################+++++++' + `````````;;;';;;; ''++++++++++####++++++++++++++++++++++++++++++++++##++++++++###########+++++###########################+++++'' + `````````';;;;```'; '+++++++++++++++++++++++++++++++++++++++++++++++++++++######++++###################################+++++' + ;;;;;:;;;;;``````'; ;;'+++++++++++++++++++++++++++++++++++++++++++++++#####++++++########+#####++#++#++#############+++++' + ;````````````;;;;;;;;;;;;;;;`````````;;;'++++++++++++++++++++++++++++++++++++++++++######++++++++++++++++++++++++##++############++++'' + ```;;;:;;;;```'';;;;;;;;;;;;;`````````++++++++#########################+++++##+####################+###+++#+###++#######+++#+'' + ;;'+'';;'';;;;;;;;;;;::::;;;;;::;;:;;'++++++######################++++++++++++###################+#####++++++######++++' + ```;;;;;;::::::;;;;;:;;;;;;;;;::;++++++#################++++++++++############################++++######+++++' + ```++';;::::```+;;;;'++:;;:;'++###############+####+++++###########################++++###+##+++' + ;```;;';;;;;;;;++;;;:;';+###################+######++++++#####+'+```+++++``````'' + ;:'+;;;;;:;++;;';;++'+++######################+++##+'+'';'+;::':,,:; + :::;+++++++'';;';;+########################'+##+'+#+''+#+''++';++;:';, + ;;:;;;;;``````'++++#######################++###+'+##+'+#+++##++#++'+'::;' + +';;```++++++#++++##################++#########+'++++'+#+++#+++#+++++';;,;'+;; + ''+++'+++++++++++########+#######################+++#+++##+++++++#+++#++++'+++++' + '+#+'+++####################################+###################################+' + ' '++######++#''++#####################+++######################+++++``` + '++ '+#######++ ' + '' ' ``````' + ' '' '' + '' + @@@ @@@ @@ @@@ @@@ @@ #@ + @@@ +@@@ +@ @@@ +@@@ @@ @@ @# + @@@ @#@@ @@ @@@ @#@@ @@ @@ @@ + @@@ @`@@ +@@@@+ '@@@@, @@ @@ @@@@ '@@@@+ +@' @@@ @,@@ +@@@@+ @@@@ @@+@@@ @@ @@ #@@@@ @+ + @@@+ @ @@ @@''@@ @@'+@@ @@ @@ @@@@ @@+'@@ @@ @@@+ @ @@ @@''@@ @@ @@@ @@# @@@@@ @@'#@# @@ + @@@@ @ @@ #@+ +@# @@+ @@ @@ @@ '@@ @@ @@@@ @ @@ #@+ +@# @@ @@' `@@ @@+ #@@ @@ + @@+@#@ @@ @@ @@ @@@@@ @@ @@ @@ @@@@@@ @@ @@+@#@ @@ @@ @@ @@ @@ @@ @@ @@@#@@ @@ + @@ @@# @@ +@# #@+ @@ @@ @@ @@ @@ @@ @@ @@ @@ @@# @@ +@# #@+ @@ @@ @@ @@ #@ +@@ @@ + @@ @@ @@ @@@@@@ @@@@@@ @@@@@@ @@ @@@@@@ @@ @@ @@ @@ @@@@@@ @@@ @@ @@ @@ #@@@@@@ @@ + @@ @@ @@ @@@@ @@@@' #@@+@@ @@ #@@##@ '@' @@ @@ @@ @@@@ @@@ @@ @@ @@ @@@ @@ '@+ + @@ @@ + +@ @# + @@ #@ +Osaka - Mosura - Special Thanks: The HySDS Team +''' + print(moth) + +if __name__ == "__main__": + ''' + Osaka CLI + ''' + sys.exit(main()) + diff --git a/osaka/base.py b/osaka/base.py new file mode 100644 index 0000000..2c783b5 --- /dev/null +++ b/osaka/base.py @@ -0,0 +1,113 @@ +''' +Created on Apr 27, 2016 + +@author: mstarch +''' +import urlparse +import osaka.utils + +class StorageBase(object): + ''' + Represents the base class for Osaka storage implementers + including basic functions. + ''' + def __init__(self,params={}): + ''' + Constructor + ''' + self.connect(params) + @classmethod + def getStorageBackend(clazz, uri): + ''' + Returns a subclass instance of this class that + can process the given URI type + @param uri: uri for which to get a backend + ''' + clazz.loadBackends() + searching = urlparse.urlparse(uri).scheme.rstrip("://") + try: + return clazz.map[searching]() + except Exception as e: + err = "Failed to get backend for {0}. Reason: {1}".format(searching,e) + osaka.utils.LOGGER.error(err) + raise osaka.utils.OsakaException(err) + @classmethod + def loadBackends(clazz): + ''' + Loads the backends + ''' + if "map" in clazz.__dict__: + return + clazz.map = {} + import osaka.storage.file + import osaka.storage.http + import osaka.storage.webdav + import osaka.storage.s3 + import osaka.storage.gs + import osaka.storage.ftp + for cls in clazz.__subclasses__(): + types = cls.getSchemes() + osaka.utils.LOGGER.debug("Found storage backend: {0} handling {1}".format(cls.__name__,types)) + for scheme in types: + clazz.map[scheme] = cls + return clazz.map + def connect(self,params={}): + ''' + Connects to the backend + ''' + raise osaka.utils.OsakaException("{0} does not implement 'connection' call".format(type(self).__name__)) + def get(self,uri): + ''' + Gets the URI as a steam + @param uri: uri to get + ''' + raise osaka.utils.OsakaException("{0} does not implement 'get' call".format(type(self).__name__)) + def put(self,stream,uri): + ''' + Puts a stream to a URI as a steam + @param stream: stream to upload + @param uri: uri to put + ''' + raise osaka.utils.OsakaException("{0} does not implement 'put' call".format(type(self).__name__)) + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + raise osaka.utils.OsakaException("{0} does not implement 'exists' call".format(type(self).__name__)) + def list(self,uri): + ''' + List URI + @param uri: uri to list + ''' + raise osaka.utils.OsakaException("{0} does not implement 'list' call".format(type(self).__name__)) + def isComposite(self,uri): + ''' + Detect if this uri is a composite uri (uri to collection of objects i.e. directory) + @param uri: uri to list + ''' + raise osaka.utils.OsakaException("{0} does not implement 'isComposite' call".format(type(self).__name__)) + def close(self): + ''' + Close this backend + ''' + raise osaka.utils.OsakaException("{0} does not implement 'close' call".format(type(self).__name__)) + def rm(self,uri): + ''' + Remove this uri from backend + @param uri: uri to remove + ''' + raise osaka.utils.OsakaException("{0} does not implement 'rm' call".format(type(self).__name__)) + def listAllChildren(self,uri): + ''' + List all children of the current uri, including this URI if it is a non-composite + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("{0} does not implement 'listAllChildren' call, attempting list-and-walk".format(type(self).__name__)) + children = [] + for entry in self.list(uri): + if self.isComposite(entry): + children.extend(self.listAllChildren(entry)) + else: + children.append(entry) + return children diff --git a/osaka/cooperator.py b/osaka/cooperator.py new file mode 100644 index 0000000..5c73695 --- /dev/null +++ b/osaka/cooperator.py @@ -0,0 +1,94 @@ +import copy +import time +from osaka.utils import OsakaException, CooperationNotPossibleException + +class Cooperator(object): + ''' + A cooperator object used to facilitate cooperation between two differen osaka instances + @mstarch + ''' + def __init__(self, source, dlock, lockMetadata): + ''' + Initialize a cooperator + ''' + self.source = source + self.dlock = dlock + self.lockMetadata = lockMetadata + self.primary = False + def __enter__(self): + ''' + Enter this cooperate block + @param: + ''' + #Atomic lock-or-cooperate depends on each backend's atomicity of "put" + try: + lockMetadata = copy.copy(self.lockMetadata) + lockMetadata["source"] = self.source + self.dlock.lock(lockMetadata) + self.primary = True + except OsakaException as ose: + if not "Lock file already locked" in ose: + raise + self.whenLocked() + return self + def __exit__(self, exception_type, exception_value, traceback): + ''' + Handle exceptions in the with block + @param exception_type: exception type + @param exception_value: exception value + @param traceback: traceback of exception + ''' + if not self.primary: + return + elif not exception_value is None: + self.dlock.setLockMetadata("error", str(exception_type) + str(exception_value)) + else: + self.dlock.unlock() + def whenLocked(self): + ''' + What to do when a file is already locked + ''' + ex_source = self.dlock.getLockMetadata("source") + error = self.dlock.getLockMetadata("error") + if ex_source is None: + raise CooperationNotPossibleException("No source specified. Cooperation not possible") + elif self.source != ex_source: + raise CooperationNotPossibleException("{0} differs incoming source {1}. Cooperation not possible" + .format(self.source, ex_source)) + elif not error is None: + raise OsakaException("Cooperation error: "+error) + def isPrimary(self): + ''' + Is this the primary responsible for download + ''' + return self.primary +class Spinner(object): + ''' + A class to spin on a download + @author mstarch + ''' + def __init__(self, dlock, timeout, interval=0.5): + ''' + Initialize this spinner + @param dlock: lock to spin on + ''' + self.dlock = dlock + self.timeout = timeout + self.interval = interval + def spin(self): + ''' + Will spin on this lock until download completes or error is detected + ''' + tm=0 + while True: + error = self.dlock.getLockMetadata("error") + if not error is None: + raise OsakaException("Cooperation error: " + error) + elif not self.dlock.isLocked(): + return + elif self.timeout != -1 and tm > self.timeout: + raise OsakaException("Timed out after {0} seconds".format(tm)) + time.sleep(self.interval) + tm = tm + self.interval + + diff --git a/osaka/lock.py b/osaka/lock.py new file mode 100644 index 0000000..c6320ab --- /dev/null +++ b/osaka/lock.py @@ -0,0 +1,157 @@ +from __future__ import print_function +import os +import json +import socket +import traceback +#Py2k-3k imports +try: + import urllib.urlparse as urlparse +except ImportError as ime: + import urlparse as urlparse +try: + import StringIO as io +except: + import io +#Osaka imports +import osaka.base +import osaka.utils +#Lock file metadata basics +try: + HOST_FQDN = socket.getfqdn() +except: + osaka.utils.LOGGER.warning("Failed to get FQDN, setting to 'unknown'") + HOST_FQDN = "unknown" +try: + PID = str(os.getpid()) +except: + osaka.utils.LOGGER.warning("Failed to get PID, setting to 'unknown'") + PID = 'unknown' +INTERLOCK_NAME_TEMPLATE = "{0}.osaka.locked.json" + +WTF_CNT=0 + +class Lock(object): + ''' + A class to handle the lock file/URIs used by osaka. Simply hand it an + Osaka URI to begin. + @author mstarch + ''' + def __init__(self, ouri, handle=None, params={}): + ''' + Construct this lock object locking the give osaka-uri + @param ouri: osaka uri for lock-handling + @param handle: osaka handle for this lock file, if None specifiy params + @param params: parameters for handle creation, if {} or None specify handle + ''' + self.ouri = ouri + self.handle = handle + self.params = params + self.luri = Lock.getLockUri(self.ouri) + self.secret = "Justin is an amazing person! Perhaps wax-sculpture worthy." + #self.lockExtras = {} + self.locked = False + def lock(self, lockMetadata={}): + ''' + Lock the object represented by this file + @param lockMetadata: metadata to stick + ''' + osaka.utils.LOGGER.debug("Locking {0} with {1}".format(self.ouri, self.luri)) + tmp = {"pid":PID,"hostname":HOST_FQDN, "osaka-lock-secret":self.secret} + tmp.update(lockMetadata) + #tmp.update(self.lockExtras) + stream = io.StringIO(json.dumps(tmp)) + with PermTemp(self.luri, self.handle, self.params) as handle: + #Not perfect exception + if handle.exists(self.luri): + raise osaka.utils.OsakaException("Lock file already locked") + handle.put(stream,self.luri) + self.locked = True + def unlock(self): + ''' + Unlock the object represented by this object + ''' + osaka.utils.LOGGER.debug("Unlocking {0} with {1}".format(self.ouri, self.luri)) + with PermTemp(self.luri, self.handle, self.params) as handle: + handle.rm(self.luri) + self.locked = False + def getLockMetadata(self, field): + ''' + Get a field out of the lock-uri metadata + @param field: field to read from the lock-uri + @return: lock-uri field + ''' + osaka.utils.LOGGER.debug("Looking for {0} in lock-uri {1}".format(field, self.luri)) + with PermTemp(self.luri, self.handle, self.params) as handle: + try: + filelike = handle.get(self.luri) + return json.load(filelike).get(field, None) + except Exception as e: + return None + def setLockMetadata(self, field, value): + ''' + Get a field out of the lock-uri metadata + @param field: field to read from the lock-uri + @param value: value to set + ''' + osaka.utils.LOGGER.debug("Looking for {0} in lock-uri {1}".format(field, self.luri)) +# if not self.locked: +# self.lockExtras[field] = value +# return + with PermTemp(self.luri, self.handle, self.params) as handle: + filelike = handle.get(self.luri) + jsn = json.load(filelike) + jsn[field] = value + stream = io.StringIO(json.dumps(jsn)) + handle.put(stream, self.luri) + def isLocked(self): + ''' + Check to see if the URI is locked + ''' + osaka.utils.LOGGER.info("Checking lock status of {0} in {1}".format(self.ouri, self.luri)) + if self.locked: + return self.locked + try: + return self.getLockMetadata("osaka-lock-secret") == self.secret + except: + return False + return ret + + @staticmethod + def getLockUri(ouri): + ''' + Gets the lockfile uri from the given ouri + @param ouri: osaka-uri to wrap with the lock + ''' + parsed = urlparse.urlparse(ouri) + parsed = parsed._replace(path=INTERLOCK_NAME_TEMPLATE.format(parsed.path.rstrip("/"))) + return parsed.geturl() +class PermTemp(object): + ''' + A permanent or temporary wrapper for a handle. Allows the "with" clause to close if and only + if we made a temporary handle + @author mstarch + ''' + def __init__(self, luri, handle=None, params={}): + ''' + Initialize a new perm-temp object + ''' + self.luri = luri + if handle is None: + self.close = True + osaka.utils.LOGGER.debug("Opening handler for lock-uri {0}".format(self.luri)) + self.handle = osaka.base.StorageBase.getStorageBackend(self.luri) + self.handle.connect(self.luri, params) + return + self.handle = handle + self.close = False + def __enter__(self): + ''' + Enter function + ''' + return self.handle + def __exit__(self, exception_type, exception_value, traceback): + ''' + Exit function + ''' + if self.close: + self.handle.close() diff --git a/osaka/main.py b/osaka/main.py new file mode 100644 index 0000000..290654c --- /dev/null +++ b/osaka/main.py @@ -0,0 +1,151 @@ +import osaka.base +import osaka.utils +import osaka.transfer + +def put(path,url,params={},measure=False,output="./pge_metrics.json",lockMetadata={},retries=0, ncoop=False, noclobber=False): + ''' + Put a file up to given url based on its scheme + @param path: path to read file from (locally) + @param url: url to put file at + @param params: (optional)parameters to hand to backend, like passwords/usernames + @param measure: (optional)take transfer metrics, True/False + @param output: (optional)output file to place metrics in + @param lockMetadata: (optional)metadata to place in Osaka lock-out file + @param retries: (optional)number of times to retry + ''' + osaka.utils.LOGGER.info("Running backwards-compatible 'Osaka Put' from {0} to {1}".format(path,url)) + transfer(path, url, params, measure, output, lockMetadata,retries=retries, ncoop=ncoop, noclobber=noclobber) +def get(url,path,params={},measure=False,output="./pge_metrics.json",lockMetadata={},retries=0,force=False, ncoop=False, noclobber=False): + ''' + Get a URL to the given path based on scheme + @param url: url to grab file from + @param path: path to save file to (locally) + @param params: (optional)parameters to hand to backend, like passwords/usernames + @param measure: (optional)take transfer metrics, True/False + @param output: (optional)output file to place metrics in + @param lockMetadata: (optional)metadata to place in Osaka lock-out file + @param retries: (optional)number of times to retry + ''' + osaka.utils.LOGGER.info("Running backwards-compatible 'Osaka Get' from {0} to {1}".format(url,path)) + transfer(url, path, params, measure, output, lockMetadata,retries=retries,force=force, ncoop=ncoop, noclobber=noclobber) +def transfer(source,dest,params={},measure=False,output="./pge_metrics.json",lockMetadata={},retries=0,force=False, ncoop=False, noclobber=False): + ''' + Transfer from one point to another + @param source: source URI to transfer from + @param dest: destination URI to transfer to + @param params: (optional)parameters to hand to backend, like passwords/usernames + @param measure: (optional)take transfer metrics, True/False + @param output: (optional)output file to place metrics in + @param lockMetadata: (optional)metadata to place in Osaka lock-out file + @param retries: (optional)number of times to retry + ''' + osaka.utils.LOGGER.info("Running new-style 'Osaka Transfer' from {0} to {1}".format(source,dest)) + #if source.startswith("rsync://") or dest.startswith("rsync://"): + # rsync(source,dest,params,measure,output,lockMetadata) + transfer = osaka.transfer.Transferer() + transfer.transfer(source,dest,params=params,measure=measure,metricsOutput=output,lockMetadata=lockMetadata,retries=retries,force=force,ncoop=ncoop,noclobber=noclobber) +# def rsync(source,dest,params={},measure=False,output="./pge_metrics.json",lockMetadata={}): +# ''' +# Rsync from one backend to a valid rsync location +# @param source: osaka-source URI to transfer from +# @param dest: rsync destination URI to transfer to +# @param params: (optional)parameters to hand to osaka-backend, like passwords/usernames +# @param measure: (optional)take transfer metrics, True/False +# @param output: (optional)output file to place metrics in +# @param lockMetadata: (optional)metadata to place in Osaka lock-out file +# @param retries: (optional)number of times to retry +# ''' +# rsyncer = osaka.rsyncer.Rsyncer() +# rsyncer.transfer(source,dest,params,measure,output,lockMetadata) +def rmall(url,params={},unlock=False,retries=0): + ''' + Remove a URL, recursively + @param url: url to remove + @param params: (optional)parameters to hand to backend, like passwords/usernames + @param retries: (optional)number of times to retry + ''' + osaka.utils.LOGGER.info("Removing {0}".format(url)) + transfer = osaka.transfer.Transferer() + transfer.remove(url,params,unlock=unlock,retries=retries) +def isLocked(url,params={}): + ''' + Is the URL locked? + @param url: url to remove + @param params: (optional)parameters to hand to backend, like passwords/usernames + ''' + osaka.utils.LOGGER.info("Checking lock status of {0}".format(url)) + transfer = osaka.transfer.Transferer() + return transfer.isLocked(url,params=params) +def exists(url,params={}): + ''' + Checks the existence of a url + @param url: url to check + @param params: params to pass-in + ''' + backend = osaka.base.StorageBase.getStorageBackend(url) + backend.connect(url,params) + return backend.exists(url) +def size(url, params={}, retries=0, force=False): + ''' + Check the size of an object + ''' + uri = url.rstrip("/") + osaka.utils.LOGGER.info("Sizing URI {0}".format(uri)) + handle = osaka.base.StorageBase.getStorageBackend(uri) + lock = osaka.lock.Lock(uri, handle) + for retry in range(0,retries+1): + try: + handle.connect(uri,params) + if not force and lock.isLocked(): + error = "URI {0} has not completed previous tranfer. Will not continue.".format(uri) + osaka.utils.LOGGER.error(error) + raise osaka.utils.OsakaException(error) + def size_it(item): + ''' Size one item ''' + osaka.utils.LOGGER.debug("Sizing specific item {0}".format(item)) + return handle.size(item) + return sum(osaka.utils.product_composite_iterator(uri, handle, size_it)) + except Exception as e: + osaka.utils.LOGGER.warning("Exception occurred, retrying({0}): {1}".format(retry+1,e)) + finally: + handle.close() + else: + raise +def list(url, params={}, retries=0, force=False): + return getChildren(url, params=params, retries=retries, force=force) +def getChildren(url, params={}, retries=0, force=False): + ''' + Check the size of an object + ''' + uri = url.rstrip("/") + osaka.utils.LOGGER.info("Get all children URI {0}".format(uri)) + handle = osaka.base.StorageBase.getStorageBackend(uri) + lock = osaka.lock.Lock(uri, handle) + for retry in range(0,retries+1): + try: + handle.connect(uri,params) + if not force and lock.isLocked(): + error = "URI {0} has not completed previous tranfer. Will not continue.".format(uri) + osaka.utils.LOGGER.error(error) + raise osaka.utils.OsakaException(error) + def identity(item): + ''' Size one item ''' + return item + return osaka.utils.product_composite_iterator(uri, handle, identity) + except Exception as e: + osaka.utils.LOGGER.warning("Exception occurred, retrying({0}): {1}".format(retry+1,e)) + finally: + handle.close() + else: + raise + +def supported(url): + ''' + Check to see if the url's scheme is supported + @param url: url whose scheme to check + ''' + osaka.utils.LOGGER.info("Checking for backend supporting {0}".format(url)) + try: + return not osaka.base.StorageBase.getStorageBackend(url) is None + except osaka.utils.OsakaException: + return False diff --git a/osaka/storage/__init__.py b/osaka/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/osaka/storage/az.py b/osaka/storage/az.py new file mode 100644 index 0000000..c30dbe6 --- /dev/null +++ b/osaka/storage/az.py @@ -0,0 +1,100 @@ +import os.path +import urlparse +from azure.storage.blob import BlobService +from osaka.utils import get_container_and_path,walk,OsakaException + +''' +Azure storage connection services +@author starchmd +''' +class Azure(object): + ''' + A class used to connect to the Azure storage and + upload/download files using blob storage + ''' + def __init__(self,params={}): + ''' + Constructor for the Azure object + + ''' + if "user" in params: + self.user = params["user"] + else: + self.user = None + if "key" in params: + self.key = params["key"] + else: + self.key = None + def connect(self, host, port, user, password, secure): + ''' + Connect to the Azure service with given user and key + @param user - username to use to connect to + @param key - key to use to connect + ''' + kwargs = {} + err = None + if not host is None: + kwargs["host_base"] = "."+host + if not user is None: + kwargs["account_name"] = user + elif not self.user is None: + kwargs["account_name"] = self.user + if not password is None: + kwargs["account_key"] = password + elif not self.key is None: + kwargs["account_key"] = self.key + kwargs["protocol"] = "https" if secure else "http" + try: + self.service = BlobService(**kwargs) + except Exception as e: + err = e.message + self.service = None + if self.service is None: + raise OsakaException("Failed to connect to Azure:"+("" if err is None else err)) + @classmethod + def getSchemes(clazz): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["azure","azures"] + def close(self): + ''' + Close this service + ''' + pass + def put(self, path, url): + ''' + Put a file up to the cloud + @param path - path to upload + @param url - path in cloud to upload too + ''' + if os.path.isdir(path): + return walk(self.put,path,url) + cont,blob = get_container_and_path(urlparse.urlparse(url).path) + self.service.create_container(cont) + self.service.put_block_blob_from_path(cont,blob,path) + return True + def get(self, url, dest): + ''' + Get file(s) from the cloud + @param url - url on cloud to pull down (on cloud) + @param dest - dest to download too + ''' + cont,blob = get_container_and_path(urlparse.urlparse(url).path) + for b in self.service.list_blobs(cont,prefix=blob): + destination=os.path.join(dest,os.path.relpath(b.name,blob)) if blob != b.name else dest + if not os.path.exists(os.path.dirname(destination)): + os.mkdir(os.path.dirname(destination)) + self.service.get_blob_to_path(cont,b.name,destination) + return True + def rm(self,url): + ''' + Remove this url and all children urls + @param url - url to remove + ''' + cont,blob = get_container_and_path(urlparse.urlparse(url).path) + for b in self.service.list_blobs(cont,prefix=blob): + self.service.delete_blob(cont,b.name) + return True diff --git a/osaka/storage/example.py b/osaka/storage/example.py new file mode 100644 index 0000000..2d8ad32 --- /dev/null +++ b/osaka/storage/example.py @@ -0,0 +1,69 @@ +from __future__ import print_function +''' +Example of a storage handler used with Osaka. +parsed from the url: + ://[:@]:/ + +@author starchmd +''' +class Example(object): + ''' + Example Osaka handler + ''' + def __init__(self,params={}): + ''' + Constructor: + 1. All parameters are in the params map + 2. Remember, this is called often + and called long before "connect" + ''' + print("Init the example handler") + def connect(self,host,port,user,password,secure): + ''' + Connect to this storage medium. All data is parsed out of the url and may be None + scheme: + @param host - may be None, host to connect to + implementor must handle defaulting + @param port - may be None, port to connect to + implementor must handle a None port + @param user - may be None, user to connect as + implementor must handle a None user + @param password - may be None, password to connect with + implementor must handle a None password + ''' + print("Connecting to example handler:",host,port,user,password,secure) + @classmethod + def getSchemes(clazz): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["example","examples"] + def put(self,path,url): + ''' + Put the given path to the given url + @param path - local path of file to put + @param url - url to put file/folder to + ''' + print("Putting:",path,"to",url) + return False + def get(self,url,dest): + ''' + Get the url (file/folder) to local path + @param url - url to get file/folder from + @param path - path to place fetched files + ''' + print("Getting:",url,"to",dest) + return False + def close(self): + ''' + Close this connection + ''' + print("Closing backend") + def rm(self,url): + ''' + Remove this file/folder + ''' + print("Removing product at:",url) + return False diff --git a/osaka/storage/file.py b/osaka/storage/file.py new file mode 100644 index 0000000..abd2614 --- /dev/null +++ b/osaka/storage/file.py @@ -0,0 +1,159 @@ +import urlparse +import shutil +import os +import re +import datetime + +import osaka.utils +import osaka.base + + +''' +File handling using local moves and/or fabric + +@author starchmd +''' +class File(osaka.base.StorageBase): + ''' + File handling for put/gets + ''' + def __init__(self): + ''' + Constructor + ''' + self.files = [] + def connect(self,uri,params={}): + ''' + Connects to the backend + ''' + osaka.utils.LOGGER.debug("Opening file handler") + @staticmethod + def getSchemes(): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["","file"] + def get(self,uri): + ''' + Gets the URI (file) as a steam + @param uri: uri to get + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri)) + fh = open(urlparse.urlparse(uri).path,"r") + self.files.append(fh) + return fh + def put(self,stream,uri): + ''' + Puts a stream to a URI as a steam + @param stream: stream to upload + @param uri: uri to put + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + osaka.utils.LOGGER.debug("Putting stream to URI: {0}".format(uri)) + path = urlparse.urlparse(uri).path + try: + os.makedirs(os.path.dirname(path)) + except Exception as e: + osaka.utils.LOGGER.debug("Exception while creating directories {0}".format(e)) + with open(path,"w") as out: + shutil.copyfileobj(stream,out) + return osaka.utils.get_disk_usage(urlparse.urlparse(uri).path) + def size(self,uri): + ''' + Size the URI (file) as a steam + @param uri: uri to get + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri)) + return os.path.getsize(urlparse.urlparse(uri).path) + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + exts = os.path.exists(urlparse.urlparse(uri).path) + osaka.utils.LOGGER.debug("Does URI {0} exist: {1}".format(uri,exts)) + return exts + def list(self,uri): + ''' + List URI + @param uri: uri to list + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + tmp = urlparse.urlparse(uri).path + if os.path.exists(tmp) and not os.path.isdir(tmp): + return [tmp] + return [os.path.join(uri,item) for item in os.listdir(tmp)] + def isComposite(self,uri): + ''' + Detect if this uri is a composite uri (uri to collection of objects i.e. directory) + @param uri: uri to list + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + isDir = os.path.isdir(urlparse.urlparse(uri).path) + osaka.utils.LOGGER.debug("Is URI {0} a directory: {1} {2}".format(uri,isDir,self.exists(uri))) + return isDir + def close(self): + ''' + Close this backend + ''' + osaka.utils.LOGGER.debug("Closing file handler") + for fh in self.files: + try: + fh.close() + except: + osaka.utils.LOGGER.debug("Failed to close file-handle for: {0}".format(fh.name)) + def rm(self,uri): + ''' + Remove this uri from backend + @param uri: uri to remove + ''' + if uri.startswith("file:") and not uri.startswith("file:///"): + raise Exception("Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris") + path = urlparse.urlparse(uri).path + if os.path.isdir(path): + shutil.rmtree(path) + else: + os.remove(path) +class FileHandlerConversion(): + ''' + This class allows a user to create a file-based approach to handling streams for backends that + cannot handle their streams independently. + ''' + def __init__(self,stream): + ''' + Initialize the class, accepting the stream and creating the temp file if needed + @param stream: stream to process + ''' + self.filename = getattr(stream,"name",None) + self.handler = None + #If the stream is not a file, make a temporary file out of it + if self.filename is None or not os.path.exists(self.filename): + self.handler = File() + self.filename = "/tmp/osaka-temporary-"+datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S.%f") + self.handler.connect(self.filename) + self.handler.put(stream,self.filename) + def __enter__(self): + ''' + Return the filename, whither real or temporary + ''' + return self.filename + def __exit__(self, exc_type, exc_val, exc_tb): + ''' + Close the temporary file if it was created + @param exc_type: unused + @param exc_val: unused + @param exc_tb: unused + ''' + if not self.handler is None: + self.handler.rm(self.filename) diff --git a/osaka/storage/ftp.py b/osaka/storage/ftp.py new file mode 100644 index 0000000..401821b --- /dev/null +++ b/osaka/storage/ftp.py @@ -0,0 +1,128 @@ +''' +An Osaka backend that connects to ftp +@author mstarch +''' +import re +import urlparse +import datetime +import os.path +import json +import osaka.base +import osaka.utils +import osaka.storage.file + +import netrc +import ftplib + +class FTP(osaka.base.StorageBase): + ''' + An FTP connecion class used to connect to the FTP servers + ''' + def __init__(self): + ''' + Constructor + ''' + self.tmpfiles = [] + def connect(self,uri,params={}): + ''' + Connects to the backend, given the URI + @param uri - ftp uri to connect to + @param params - optional, parameters for the connection + ''' + parsed = urlparse.urlparse(uri) + netloc = parsed.netloc + username = parsed.username + password = parsed.password + #If we were not supplied a username or password, seek it in the netrc + if username is None or password is None: + rchandle = netrc.netrc() + username, account, password = rchandle.authenticators(netloc) + osaka.utils.LOGGER.info("Logging into {0} with ({1})".format(netloc, username)) + self.ftp = ftplib.FTP(netloc, username, password) + #self.ftp.login() + osaka.utils.LOGGER.debug("Successfully logged in") + @staticmethod + def getSchemes(): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["ftp"] + def get(self,uri): + ''' + Gets the URI ftp as a steam + @param uri: uri to get + ''' + osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri)) + filename = urlparse.urlparse(uri).path + fname = "/tmp/osaka-ftp-"+str(datetime.datetime.now()) + with open(fname, "w") as tmpf: + self.ftp.retrbinary('RETR %s' % filename, tmpf.write) + fh = open(fname,"r+b") + self.tmpfiles.append(fh) + fh.seek(0) + return fh #obj.get()["Body"] + def put(self,stream,uri): + ''' + Puts a stream to a URI as a steam + @param stream: stream to upload + @param uri: uri to put + ''' + raise OsakaException("Not implemented; implementation deferred") + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Does URI {0} exist?".format(uri)) + #A key exists if it has some children + return len(self.listAllChildren(uri)) > 0 + def list(self,uri): + ''' + List URI + @param uri: uri to list + ''' + parsed = urlparse.urlparse(uri) + filename = parsed.path + listing = self.ftp.nlst(filename) + base = parsed.netloc if parsed.netloc.endswith("/") else parsed.netloc + "/" + listing = [parsed.scheme + "://" +base + item.lstrip("/") for item in listing] + return listing + def isComposite(self,uri): + ''' + Detect if this uri is a composite uri (uri to collection of objects i.e. directory) + @param uri: uri to list + ''' + osaka.utils.LOGGER.debug("Is URI {0} a directory".format(uri)) + children = self.list(uri) + if len(children) == 0 or (len(children) == 1 and children[0] == uri): + return False + return True + def close(self): + ''' + Close this backend + ''' + osaka.utils.LOGGER.debug("Closing ftp handler") + for fh in self.tmpfiles: + try: + fh.close() + except: + osaka.utils.LOGGER.debug("Failed to close temporary file-handle for: {0}".format(fh.name)) + try: + os.remove(fh.name) + except: + osaka.utils.LOGGER.debug("Failed to remove temporary file-handle for: {0}".format(fh.name)) + def size(self,uri): + ''' + Size this uri from backend + @param uri: uri to size + ''' + filename = urlparse.urlparse(uri).path + return self.size(filename) + def rm(self,uri): + ''' + Remove this uri from backend + @param uri: uri to remove + ''' + raise OsakaException("Not implemented; implementation deferred") diff --git a/osaka/storage/gs.py b/osaka/storage/gs.py new file mode 100644 index 0000000..c3495cb --- /dev/null +++ b/osaka/storage/gs.py @@ -0,0 +1,190 @@ +import re +from google.cloud import storage +from google.cloud.exceptions import Conflict, Forbidden +import urlparse +import datetime +import os.path +from StringIO import StringIO + +import osaka.base +import osaka.utils +import osaka.storage.file + +''' +google storage connection service +@author starchmd,gmanipon +''' + +class GS(osaka.base.StorageBase): + ''' + Handles GS file copies + ''' + def __init__(self): + ''' + Constructor + ''' + self.files = [] + + + def connect(self,uri,params={}): + ''' + Connects to the backend + @param uri - gs uri for resource + @param params - optional, may contain: location + ''' + osaka.utils.LOGGER.debug("Opening GS handler") + uri = re.compile("^gs").sub("http",uri) + parsed = urlparse.urlparse(uri) + session_kwargs = {} + kwargs = {} + check_host = parsed.hostname if not "location" in params else params["location"] + if not parsed.hostname is None: + kwargs["endpoint_url"] = "%s://%s" % (parsed.scheme, parsed.hostname) + else: + kwargs["endpoint_url"] = "%s://%s" % (parsed.scheme, kwargs["endpoint_url"]) + if not parsed.port is None: + kwargs["endpoint_url"] = "%s:%s" % (kwargs["endpoint_url"], parsed.port) + if not parsed.username is None: + session_kwargs["gcp_access_key_id"] = parsed.username + elif "gcp_access_key_id" in params: + session_kwargs["gcp_access_key_id"] = params["gcp_access_key_id"] + if not parsed.password is None: + session_kwargs["gcp_secret_access_key"] = parsed.password + elif "gcp_secret_access_key" in params: + session_kwargs["gcp_secret_access_key"] = params["gcp_secret_access_key"] + kwargs["use_ssl"] = parsed.scheme == "https" + + container,key = osaka.utils.get_container_and_path(parsed.path) + session_kwargs["profile_name"] = container + self.gs = storage.Client() + + + @staticmethod + def getSchemes(): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["gs"] + + + def get(self,uri): + ''' + Gets the URI as a stream + @param uri: uri to get + ''' + osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri)) + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container,create=False) + blob = bucket.blob(key) + stream = StringIO(blob.download_as_string()) + return stream + + def put(self,stream,uri): + ''' + Puts a stream to a URI as a stream + @param stream: stream to upload + @param uri: uri to put + ''' + osaka.utils.LOGGER.debug("Putting stream to URI: {0}".format(uri)) + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container) + blob = bucket.blob(key) + with osaka.storage.file.FileHandlerConversion(stream) as fn: + blob.upload_from_filename(fn) + return blob.size + + def size(self, uri): + ''' + Get the size of this object + ''' + osaka.utils.LOGGER.debug("Getting size from URI: {0}".format(uri)) + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container,create=False) + blob = bucket.blob(key) + raise blob.size + + def listAllChildren(self,uri): + ''' + List all children of the current uri + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Running list all children") + parsed = urlparse.urlparse(uri) + container,key = osaka.utils.get_container_and_path(parsed.path) + bucket = self.bucket(container,create=False) + collection = bucket.list_blobs(prefix=key) + uriBase = parsed.scheme+"://"+parsed.hostname + (":"+str(parsed.port) if not parsed.port is None else "") + return [ uriBase +"/"+ container + "/" + item.name for item in collection if item.name == key or item.name.startswith(key+"/")] + + + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Does URI {0} exist?".format(uri)) + #A key exists if it has some children + return len(self.listAllChildren(uri)) > 0 + + + def list(self,uri): + ''' + List URI + @param uri: uri to list + ''' + depth = len(uri.rstrip("/").split("/")) + return [item for item in self.listAllChildren() if len(item.rstrip("/").split("/")) == (depth + 1)] + + + def isComposite(self,uri): + ''' + Detect if this uri is a composite uri (uri to collection of objects i.e. directory) + @param uri: uri to list + ''' + osaka.utils.LOGGER.debug("Is URI {0} a directory".format(uri)) + children = self.listAllChildren(uri) + if len(children) == 0 or (len(children) == 1 and children[0] == uri): + return False + return True + + + def close(self): + ''' + Close this backend + ''' + osaka.utils.LOGGER.debug("Closing GS handler") + + + def rm(self,uri): + ''' + Remove this uri from backend + @param uri: uri to remove + ''' + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container,create=False) + bucket.delete_blob(key) + + + def getKeysWithPrefixURI(self,uri): + ''' + Keys with prefix of given URI + @param uri: prefix URI + ''' + parsed = urlparse.urlparse(uri) + container,key = osaka.utils.get_container_and_path(parsed.path) + bucket = self.bucket(container,create=False) + collection = bucket.list_blobs(prefix=key) + return [item.bucket_name + "/" + item.name for item in collection] + + + def bucket(self,bucket,create=True): + ''' + Gets the given bucket or makes it + @param bucket - name of bucket to find + ''' + try: + return self.gs.create_bucket(bucket) + except (Conflict, Forbidden), e: + return self.gs.get_bucket(bucket) diff --git a/osaka/storage/http.py b/osaka/storage/http.py new file mode 100644 index 0000000..71b1d37 --- /dev/null +++ b/osaka/storage/http.py @@ -0,0 +1,242 @@ +from __future__ import print_function +import os +import re +import urlparse +import requests + +from requests.auth import HTTPBasicAuth,HTTPDigestAuth + +import osaka.utils +import osaka.base + +requests.packages.urllib3.disable_warnings() +''' +HTTP Handler + +This Osaka backend uses requests to handle HTTP requests. + +@author starchmd +''' +class HTTP(osaka.base.StorageBase): + ''' + Http and WebDav handling backends + ''' + BLOCK_SIZE=4096 + HREF_RE = re.compile(r'href\s*=\s*"([^"]+)"') + + def __init__(self): + ''' + Constructor + ''' + pass + def connect(self,uri,params={}): + ''' + Connects to the backend + ''' + self.timeout = 1800.0 if not "timeout" in params else params["timeout"] + parsed = urlparse.urlparse(uri) + user = None if not "user" in parsed else parsed["user"] + password = None if not "password" in parsed else parsed["password"] + osaka.utils.LOGGER.debug("Opening HTTP handler") + if user is None or password is None: + tmp = self.getNetRCCredentials(uri) + user = None if tmp is None else tmp["user"] + password = None if tmp is None else tmp["password"] + if "oauth" in params and not params["oauth"] is None: + osaka.utils.LOGGER.info("Connecting to http using OAuth: {0}".format(params["oauth"])) + self.session = self.oauthSession(params["oauth"]) + else: + #connectionUri = parsed.scheme + "://" +parsed.hostname+(":"+str(parsed.port) if not parsed.port is None else "") + osaka.utils.LOGGER.info("Connecting to http using with http: {0} with user: {1} and password {2}".format(uri,user,password)) + self.session = self.standardSession(uri,user,password,self.timeout) + + @staticmethod + def getSchemes(): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["http","https"] + def get(self,uri,text=False): + ''' + Gets the URI (file) as a steam + @param uri: uri to get + @param text: should we pull out text data instead of raw + ''' + osaka.utils.LOGGER.debug("Getting stream from URI: {0} Timeout: {1}".format(uri,self.timeout)) + response = self.session.get(uri,stream=True,verify=False,timeout=self.timeout) + response.raise_for_status() + if text: + return response.text + return response.raw + def put(self,stream,uri): + ''' + Puts a stream to a URI as a steam + @param stream: stream to upload + @param uri: uri to put + ''' + raise osaka.utils.OsakaException("Osaka HTTP does not support PUT requests") + def size(self,uri): + ''' + Get size try + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Getting size of {0} exist? Timeout: {1}".format(uri,self.timeout)) + response = self.session.get(uri,stream=True,verify=False,timeout=self.timeout) + response.raise_for_status() + size = long(response.headers['content-length']) + response.close() + return size + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Does URI {0} exist? Timeout: {1}".format(uri,self.timeout)) + try: + ret = self.session.head(uri,timeout=self.timeout) + #Custom raise_for_status, to include any non-200 code forcing a different check + if ret.status_code != 200: + raise osaka.utils.OsakaException("Bad response for existence checking") + return True + except Exception as e: + pass + osaka.utils.LOGGER.debug("HEAD call not allowed, attempting get w/o read") + try: + text = self.get(uri,text=True) + if re.search("\s*(?: 0 + def list(self,uri): + ''' + List URI + @param uri: uri to list + ''' + depth = len(uri.rstrip("/").split("/")) + return [item for item in self.listAllChildren() if len(item.rstrip("/").split("/")) == (depth + 1)] + def isComposite(self,uri): + ''' + Detect if this uri is a composite uri (uri to collection of objects i.e. directory) + @param uri: uri to list + ''' + osaka.utils.LOGGER.debug("Is URI {0} a directory".format(uri)) + children = self.listAllChildren(uri) + if len(children) == 0 or (len(children) == 1 and children[0] == uri): + return False + return True + def close(self): + ''' + Close this backend + ''' + osaka.utils.LOGGER.debug("Closing S3 handler") + for fh in self.tmpfiles: + try: + fh.close() + except: + osaka.utils.LOGGER.debug("Failed to close temporary file-handle for: {0}".format(fh.name)) + try: + os.remove(fh.name) + except: + osaka.utils.LOGGER.debug("Failed to remove temporary file-handle for: {0}".format(fh.name)) + + def size(self,uri): + ''' + Size this uri from backend + @param uri: uri to size + ''' + if uri in self.cache: + return self.cache[uri].size + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container,create=False) + obj = bucket.Object(key) + try: + return obj.content_length + except Exception as exc: + if "Invalid length for parameter Key, value: 0" in str(exc): + return 0 + raise + def rm(self,uri): + ''' + Remove this uri from backend + @param uri: uri to remove + ''' + container,key = osaka.utils.get_container_and_path(urlparse.urlparse(uri).path) + bucket = self.bucket(container,create=False) + obj = bucket.Object(key) + obj.delete() + + def getKeysWithPrefixURI(self,uri): + ''' + Keys with prefix of given URI + @param uri: prefix URI + ''' + parsed = urlparse.urlparse(uri) + container,key = osaka.utils.get_container_and_path(parsed.path) + bucket = self.bucket(container,create=False) + collection = bucket.objects.filter(Prefix=key) + return [item.bucket_name + "/" + item.key for item in collection] + def bucket(self,bucket,create=True): + ''' + Gets the given bucket or makes it + @param bucket - name of bucket to find + ''' + b = self.s3.Bucket(bucket) + exists = True + try: + self.s3.meta.client.head_bucket(Bucket=bucket) + except botocore.exceptions.ClientError, e: + error_code = int(e.response['Error']['Code']) + if error_code == 404: + exists = False + if exists is False and create: + loc=re.sub("s3.([^.]+)\..*","\g<1>",self.s3.host) + if loc == 'amazonaws': loc = '' # handle us-east-1 + b = self.s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={'LocationConstraint': loc}) + return b diff --git a/osaka/storage/sftp.py b/osaka/storage/sftp.py new file mode 100644 index 0000000..91d3927 --- /dev/null +++ b/osaka/storage/sftp.py @@ -0,0 +1,109 @@ +from __future__ import print_function + +import os +import os.path +import stat +import urlparse +import paramiko +''' +A backend used to handle stfp using parimiko + +@author starchmd +''' +class SFTP(object): + ''' + SFTP handling for Osaka + ''' + def __init__(self,params={}): + ''' + Constructor + ''' + self.keyfile = params["keyfile"] if "keyfile" in params else None + def connect(self,host=None,port=None,user=None,password=None,secure=False): + ''' + Connect to this storage medium. All data is parsed out of the url and may be None + scheme: + @param host - may be None, host to connect to + implementor must handle defaulting + @param port - may be None, port to connect to + implementor must handle a None port + @param user - may be None, user to connect as + implementor must handle a None user + @param password - may be None, password to connect with + implementor must handle a None password + ''' + self.client = paramiko.client.SSHClient() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.client.connect(host,port=22 if port is None else int(port),username=user,password=password,key_filename=self.keyfile,timeout=15) + self.sftp = self.client.open_sftp() + + @classmethod + def getSchemes(clazz): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["sftp"] + def put(self,path,url): + ''' + Put the given path to the given url + @param path - local path of file/folder to put + @param url - url to put file/folder to + ''' + rpath = urlparse.urlparse(url).path.lstrip("/") + print("\n\n\n\nUploading:",path); + if not os.path.isdir(path): + print("As file"); + try: + self.sftp.mkdir(os.path.dirname(rpath)) + except IOError as e: + pass + dest = rpath + try: + if stat.S_ISDIR(self.sftp.stat(rpath).st_mode) != 0: + dest = os.path.join(rpath,os.path.basename(path)) + except: + pass + return self.upload(path,dest) + print("As Dir"); + try: + self.sftp.mkdir(rpath) + except IOError as e: + pass + for dirpath, dirname, filenames in os.walk(path): + extra = os.path.relpath(dirpath,os.path.dirname(path)) + try: + self.sftp.mkdir(os.path.join(rpath,extra)) + except IOError as e: + pass + for filename in filenames: + self.upload(os.path.join(dirpath,filename),os.path.join(rpath,extra,filename)) + def upload(self,path,rpath): + ''' + Uploads a file to remote path + @param path - path to upload + @param rpath - remote path to upload to + ''' + self.sftp.put(path,rpath) + return True + def get(self,url,path): + ''' + Get the url (file/folder) to local path + @param url - url to get file/folder from + @param path - path to place fetched files + ''' + rpath = urlparse.urlparse(url).path + self.sftp.get(rpath,path) + def rm(self,url): + ''' + Remove the item + @param url - url to remove + ''' + rpath = urlparse.urlparse(url).path + self.sftp.remove(rpath) + def close(self): + ''' + Close this connection + ''' + self.client.close() diff --git a/osaka/storage/webdav.py b/osaka/storage/webdav.py new file mode 100644 index 0000000..5d2c6b2 --- /dev/null +++ b/osaka/storage/webdav.py @@ -0,0 +1,153 @@ +from __future__ import print_function +import os +import re +import urlparse +import requests +import easywebdav +import datetime + +from requests.auth import HTTPBasicAuth,HTTPDigestAuth + +import osaka.utils +import osaka.base +import osaka.storage.file +import osaka.storage.http + +requests.packages.urllib3.disable_warnings() +''' +WebDav Handler + +Osaka handler for the webdav backend services. + +Note: easywebdav is not used in all functions, as it assumes that the "PROPFIND" command is permitted +and on some workers, it is not. Thus, this backend uses the HTTP backend internally + +@author starchmd +''' +class DAV(osaka.base.StorageBase): + ''' + Http and WebDav handling backends + ''' + def __init__(self): + ''' + Constructor + ''' + pass + def connect(self,uri,params={}): + ''' + Connects to the backend + ''' + osaka.utils.LOGGER.debug("Opening WebDav handler") + #Grab information out of the URI + username,password = osaka.utils.get_uri_username_and_password(uri) + scheme,host = osaka.utils.get_uri_scheme_and_hostname(uri) + #Setup webdav connection + self.webdav = easywebdav.connect(host, username=username, password=password,protocol=re.compile("^dav").sub("http",scheme), verify_ssl=False) + self.httpHandler = osaka.storage.http.HTTP() + self.httpHandler.connect(re.compile("^dav").sub("http",uri), params) + @staticmethod + def getSchemes(): + ''' + Returns a list of schemes this handler handles + Note: handling the scheme of another handler produces unknown results + @returns list of handled schemes + ''' + return ["dav","davs"] + def get(self,uri,text=False): + ''' + Gets the URI (file) as a steam + @param uri: uri to get + @param text: should we pull out text data instead of raw + ''' + osaka.utils.LOGGER.debug("Getting stream to URI: {0} Note: Using HTTP GET".format(uri)) + #Use the standard HTTP handler for getting the product + return self.httpHandler.get(re.compile("^dav").sub("http",uri),text=text) + def put(self,stream,uri): + ''' + Puts a stream to a URI as a steam + @param stream: stream to upload + @param uri: uri to put + ''' + osaka.utils.LOGGER.debug("Putting stream to URI: {0}".format(uri)) + path = osaka.utils.get_uri_path(uri) + #Attempt to create the directories needed + try: + self.webdav.mkdirs(os.path.dirname(path)) + self.webdav.delete(path) + except Exception as e: + osaka.utils.LOGGER.debug("Exception making directories and cleaning up existing product: {0}".format(e)) + #Create a filename to handle this stream + with osaka.storage.file.FileHandlerConversion(stream) as fn: + #Handle zero-length separately from webdav + if os.path.getsize(fn) == 0: + self.httpHandler.session.put(re.compile("^dav").sub("http",uri),"",verify=False,timeout=self.httpHandler.timeout).raise_for_status() + else: + self.webdav.upload(fn,path) + #Get size for put item + response = self.httpHandler.session.head(re.compile("^dav").sub("http",uri),verify=False,timeout=self.httpHandler.timeout) + response.raise_for_status() + return int(response.headers["Content-Length"]) + def size(self, uri): + ''' + Size of object + ''' + osaka.utils.LOGGER.debug("Size stream to URI: {0} Note: Using HTTP size".format(uri)) + #Use the standard HTTP handler for getting the product + return self.httpHandler.size(re.compile("^dav").sub("http",uri),text=text) + def exists(self,uri): + ''' + Does the URI exist? + @param uri: uri to check + ''' + osaka.utils.LOGGER.debug("Does URI {0} exist?".format(uri)) + try: + path = osaka.utils.get_uri_path(uri) + tmp = self.webdav.exists(path) + osaka.utils.LOGGER.debug("Does URI {0} exist? {1}".format(uri,tmp)) + return tmp + except Exception as e: + pass + osaka.utils.LOGGER.debug("Failed to check existence using HEAD") + try: + text = self.httpHandler.get(re.compile("^dav").sub("http",uri),text=True) + if re.search("\s*(?:= 9, "Retry didn't occur enough {0} vs {1}".format(time1/time0, 10)) diff --git a/osaka/tests/test_timeout.py b/osaka/tests/test_timeout.py new file mode 100644 index 0000000..affdb63 --- /dev/null +++ b/osaka/tests/test_timeout.py @@ -0,0 +1,75 @@ +import os +import copy +import subprocess +import unittest +import requests.exceptions + +import osaka.main +import osaka.tests.util +''' +Created on Oct 31, 2016 + +@author: mstarch +''' +class TimeoutTest(unittest.TestCase): + ''' + A test that setus up high and low timeouts to ensure that the timeouts + work properly. + ''' + def setUp(self): + ''' + Setup method for the test case + ''' + self.config = osaka.tests.util.load_test_config() + self.addCleanup(self.cleanup) + unittest.TestCase.setUp(self) + self.scratch = self.config.get("scratch_file","/tmp/osaka-unittest-scratch/") + self.worker = self.config.get("dav", {}).get("worker", None) + # A list of input objects from various locations + self.ins = [ self.config.get("dav",{}).get("test_input_urls",[])[0], + self.config.get("http",{}).get("test_input_urls",[])[0] + ] + # A list of output only locations + self.out = [] + #Construct path to checked-in test cases + self.base = os.path.dirname(osaka.__file__)+"/../resources/objects/" + self.objects = [os.path.join(self.base,listing) for listing in os.listdir(self.base) if listing.startswith("test-")] + osaka.tests.util.scpWorkerObject(self,self.objects[1]) + self.assertTrue(self.scratch.startswith("/tmp/osaka"),"Assertion Error: scratch space is un-safe") + #Clean up old temp directories and create new ones + try: + osaka.main.rmall(self.scratch, unlock=True) + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + os.makedirs(self.scratch) + def cleanup(self): + ''' + Cleanup existing directories + ''' + try: + osaka.main.rmall(self.scratch, unlock=True) + except Exception as e: + pass + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + return True + def test_timeout(self): + ''' + Timeout and ensure that there error + ''' + for obj in self.ins: + self.cleanup() + with self.assertRaises(requests.exceptions.Timeout): + osaka.main.get(obj,self.scratch, {"timeout":0.00000001}) + def test_notimeout(self): + ''' + Timeout and ensure that there error + ''' + for obj in self.ins: + self.cleanup() + try: + osaka.main.get(obj,self.scratch, {"timeout":1000}) + except requests.exceptions.Timeout as te: + self.assertFalse(True,"Timeout recieved when not intended") diff --git a/osaka/tests/test_transfer.py b/osaka/tests/test_transfer.py new file mode 100644 index 0000000..689030e --- /dev/null +++ b/osaka/tests/test_transfer.py @@ -0,0 +1,180 @@ +import re +import os +import copy +import subprocess +import unittest + +import osaka.main +import osaka.tests.util + +#Turn off requests warning +import requests +requests.packages.urllib3.disable_warnings() +''' +Created on Aug 29, 2016 + +@author: mstarch +''' +class TransferTest(unittest.TestCase): + ''' + A test that flushes out standard transfer functions between all backends. + Performs the cross-product between the inputs and the outputs. + ''' + def setUp(self): + ''' + Setup method for the test case + ''' + self.config = osaka.tests.util.load_test_config() + self.addCleanup(self.cleanup) + unittest.TestCase.setUp(self) + self.scratch = self.config.get("scratch_file","/tmp/osaka-unittest-scratch/") + self.file = self.config.get("tmp_file", "/tmp/osaka-unittest-objects/") + self.worker = self.config.get("dav",{}).get("worker", None) + # A list of input objects from various locations + self.ins = [] + # A list of backends supporting both incoming and + # outgoing product, and thus can be setup internal + # to this test + self.inouts = [] + for section in self.config.values(): + try: + self.ins.extend(section.get("test_input_urls",[])) + self.inouts.extend(section.get("test_output_urls",[])) + except AttributeError as aee: + pass + # A list of output only locations + self.out = [] + #Construct path to checked-in test cases + self.base = os.path.dirname(osaka.__file__)+"/../resources/objects/" + self.objects = [os.path.join(self.base,listing) for listing in os.listdir(self.base) if listing.startswith("test-")] + osaka.tests.util.scpWorkerObject(self, self.objects[1]) + self.assertTrue(self.scratch.startswith("/tmp/osaka"),"Assertion Error: scratch space is un-safe") + #Clean up old temp directories and create new ones + try: + osaka.main.rmall(self.scratch, unlock=True) + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + try: + osaka.main.rmall(self.file) + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + os.makedirs(self.scratch) + os.makedirs(self.file) + def cleanup(self): + ''' + Cleanup existing directories + ''' + try: + osaka.main.rmall(self.scratch, unlock=True) + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + try: + osaka.main.rmall(self.file) + except OSError as e: + if not str(e).startswith("[Errno 2]"): + raise + return True + def test_InOuts(self,callback=None): + ''' + A test running against all in-out capable backends + @param callback: additional code to run per-remote object (for bigger tests) + ''' + objs = [] + #For every input-output backend + for inout in self.inouts: + osaka.utils.LOGGER.info("Running In-Out Test for {0}".format(inout)) + objs = self.uploadInputObjects(inout) + #Test downloading + for remote in objs: + osaka.main.get(remote,self.scratch) + loc = os.path.join(self.scratch,os.path.basename(remote)) + self.assertTrue(self.checkObject(loc),"Downloaded product inconsistent with original product: {0}".format(loc)) + #Run submitted code, if supplied + if not callback is None: + callback(loc,remote) + #Cleanup external + osaka.main.rmall(remote) + osaka.main.rmall(loc) + + def test_InOutExists(self): + ''' + A test to ensure that, if a directory, re-transferring to existing location creates a child directory, and if + a file, it replaces the file. + ''' + scratch = os.path.join(self.scratch,"redownload") + os.makedirs(scratch) + def reupload(download,remote): + ''' + Re-uploads the supplied object and checks that + the dowloaded object is consistent with the tested behavior. + @param download: downloaded object + @param remote: remote object + ''' + osaka.main.transfer(download, remote) + osaka.main.transfer(remote, scratch) + if not os.path.isdir(download): + self.checkObject(os.path.join(scratch,os.path.basename(remote))) + else: + self.checkObject(os.path.join(scratch,os.path.basename(remote),os.path.basename(remote))) + osaka.main.rmall(scratch) + #First run the in-out tests + self.test_InOuts(reupload) + def test_Criscross(self,extras=[]): + ''' + Tests the cross product of inputs and outputs to test every permutation of in to out. + @param extras: extra (external) files to add as inputs into the base criscross test + ''' + uploads = [] + noquery = re.compile("\?[^?]*") + #For every output backend + for inout in self.inouts: + try: + osaka.utils.LOGGER.info("Running Criscross Test for {0}".format(inout)) + uploads = self.uploadInputObjects(inout) + objs = copy.copy(uploads) + objs.extend(extras) + dest = os.path.join(inout,"output-objects") + #For each input, transfer it to the + for remote in objs: + final = os.path.join(dest,os.path.basename(remote)) + final = noquery.sub("", final) + osaka.main.transfer(remote,final) + osaka.main.transfer(final,self.scratch) + loc = noquery.sub("", os.path.join(self.scratch,os.path.basename(remote))) + self.assertTrue(self.checkObject(loc),"Downloaded product inconsistent with original product: {0}".format(loc)) + osaka.main.rmall(loc) + #Cleanup + osaka.main.rmall(dest) + finally: + try: + osaka.main.rmall(os.path.join(inout,"input-objects")) + except: + pass + def test_CriscrossWithExternal(self): + ''' + Tests the cross product of inputs and outputs to test every permutation of in to out. + Note: adds extra input only products + ''' + self.test_Criscross(self.ins) + def checkObject(self,obj): + ''' + Checks an object in against the original object contained in the Osaka module + @param obj - object to test (by name) against original + ''' + return subprocess.call(["diff","-r",os.path.join(self.base,os.path.basename(obj),obj), obj], stdout=osaka.tests.util.DEVNULL, stderr=osaka.tests.util.DEVNULL) == 0 + def uploadInputObjects(self,uriBase): + ''' + Upload all input products, safely, to end-point. Places them in directory "input-objects" + @param uriBase: base uri to upload to + @return: set of uploaded products + ''' + output = [] + dest = os.path.join(uriBase,"input-objects") + for obj in self.objects: + self.assertFalse(osaka.main.exists(os.path.join(dest,os.path.basename(obj))), "Destination {0} already exists, test cannot safely continue.".format(dest)) + osaka.main.put(obj, os.path.join(dest,os.path.basename(obj))) + output.append(os.path.join(dest,os.path.basename(obj))) + return output diff --git a/osaka/tests/util.py b/osaka/tests/util.py new file mode 100644 index 0000000..8bd5261 --- /dev/null +++ b/osaka/tests/util.py @@ -0,0 +1,27 @@ +import os +import json +import logging +import subprocess + +logging.basicConfig(level=logging.ERROR) + +#Discard output +try: + from subprocess import DEVNULL # py3k +except ImportError: + import os + DEVNULL = open(os.devnull, 'wb') + +def load_test_config(): + ''' + Load a test configuration + ''' + with open(os.path.join(os.path.dirname(__file__), "test.json")) as fp: + return json.load(fp) + +def scpWorkerObject(self,obj): + ''' + ''' + ret = subprocess.call(["scp","-r",obj,self.worker+":/data/work/"], stdout=DEVNULL, stderr=DEVNULL) + if ret != 0: + raise Exception("Failed to SCP input to WebDav worker") diff --git a/osaka/transfer.py b/osaka/transfer.py new file mode 100644 index 0000000..b304d8f --- /dev/null +++ b/osaka/transfer.py @@ -0,0 +1,166 @@ +''' +Created on Apr 27, 2016 + +@author: mstarch +''' +import os +import json +import socket +import datetime + +#Osaka imports +import osaka.base +import osaka.lock +import osaka.cooperator +import osaka.utils + +class Transferer(object): + ''' + A class used to atomically transfer files between Osaka storage endpoints + ''' + def transfer(self,source,dest,params={},measure=False,metricsOutput="./pge_metrics.json",lockMetadata={},retries=0,force=False,ncoop=False,noclobber=False): + ''' + Transfer an objects between source and dest + @param source: source osaka-style URI + @param dest: destination osaka-style URI + @param lockMetadata: (optional) extra metadata for adding to lock file + @param retries: (optional) number of times to retry a command + @param force: force a fetch of failed transfers + @param ncoop: fail rather than cooperate with an already running osaka + @param noclobber: raise exception if you will clobber an existing object + ''' + metrics = None + #Refine uris to be standard + source = source.rstrip("/") + dest = dest.rstrip("/") + osaka.utils.LOGGER.info("Opening connections for {0} and {1}".format(source,dest)) + #Get handlers for the source and destination + shandle = osaka.base.StorageBase.getStorageBackend(source) + dhandle = osaka.base.StorageBase.getStorageBackend(dest) + for retry in range(0,retries+1): + try: + shandle.connect(source,params) + dhandle.connect(dest,params) + #Check if destination is a waiting directory + if dhandle.exists(dest) and dhandle.isComposite(dest): + dest = os.path.join(dest,os.path.basename(source)) + slock = osaka.lock.Lock(source, shandle) + dlock = osaka.lock.Lock(dest, dhandle) + if source == dest: + error = "Source, {0}, and destination, {1}, are the same".format(source,dest) + osaka.utils.LOGGER.error(error) + raise osaka.utils.OsakaException(error) + elif dhandle.exists(dest) and noclobber: + error = "Destination, {0}, already exists and no-clobber is set".format(dest) + osaka.utils.LOGGER.error(error) + raise osaka.utils.NoClobberException(error) + if slock.isLocked() and not force: + error = "Source {0} has not completed previous tranfer. Will not continue.".format(source) + osaka.utils.LOGGER.error(error) + raise osaka.utils.OsakaException(error) + elif slock.isLocked() and force: + error = "Source {0} has not completed previous tranfer. Will continue by force.".format(source) + osaka.utils.LOGGER.warning(error) + osaka.utils.LOGGER.info("Transferring between {0} and {1}".format(source,dest)) + #Atomically upload the file or files + with osaka.cooperator.Cooperator(source, dlock, lockMetadata) as coop: + if coop.isPrimary(): + metrics = self.transfer_uri(source, shandle, dest, dhandle) + elif ncoop: + raise osaka.utils.CooperationRefusedException("Competeing Osaka instance running, and cooperation was turned off") + else: + osaka.cooperator.Spinner(dlock, params.get("timeout", -1)).spin() + break + except Exception as e: + osaka.utils.LOGGER.warning("Exception occurred, retrying({0}): {1}".format(retry+1,e)) + finally: + shandle.close() + dhandle.close() + #If we never reach the break, reraise the last exception that happened + else: + raise + if measure and not metrics is None: + self.writeMetrics(metrics,metricsOutput) + def transfer_uri(self, source, shandle, dest, dhandle): + ''' + Transfer a URI recursing into it if it is a composite + ''' + metrics = { + "source":source, + "destination":dest, + "type":"osaka-transfer", + "time_start":datetime.datetime.utcnow() + } + def transfer_one(uri): + ''' Transfer a single item ''' + relative = os.path.relpath(uri,source) + specificDest = dest if relative == "." else os.path.join(dest,relative) + osaka.utils.LOGGER.debug("Transferring individual object from {0} to {1}".format(uri,specificDest)) + stream = shandle.get(uri) + count = dhandle.put(stream,specificDest) + stream.close() + return count + counts = osaka.utils.product_composite_iterator(source, shandle, transfer_one) + metrics["time_end"] = datetime.datetime.utcnow() + metrics["size"] = sum(counts) + return metrics + def remove(self,uri,params={},unlock=False,retries=0): + ''' + Removal URI and all children + @param uri: URI to remove + @param unlock: shall we unlock first? Otherwise error on locked file + ''' + uri = uri.rstrip("/") + osaka.utils.LOGGER.info("Removing URI {0}".format(uri)) + handle = osaka.base.StorageBase.getStorageBackend(uri) + lock = osaka.lock.Lock(uri, handle) + for retry in range(0,retries+1): + try: + handle.connect(uri,params) + if not unlock and lock.isLocked(): + error = "URI {0} has not completed previous tranfer. Will not continue.".format(uri) + osaka.utils.LOGGER.error(error) + raise osaka.utils.OsakaException(error) + elif lock.isLocked(): + lock.unlock() + def remove_one(item): + ''' Remove one item ''' + osaka.utils.LOGGER.debug("Removing specific item {0}".format(item)) + handle.rm(item) + osaka.utils.product_composite_iterator(uri, handle, remove_one, True) + break + except Exception as e: + osaka.utils.LOGGER.warning("Exception occurred, retrying({0}): {1}".format(retry+1,e)) + finally: + handle.close() + else: + raise + def writeMetrics(self,metrics,output): + ''' + Write out all the metrics + @param metrics - metrics collected + @param output - output file + ''' + osaka.utils.LOGGER.info("Attempting to merge metrics with: {0}".format(output)) + #Rectify metrics + metrics["duration"] = (metrics["time_end"] - metrics["time_start"]).total_seconds() + metrics["transfer_rate"] = metrics["size"]/metrics["duration"] + metrics["time_start"] = metrics["time_start"].isoformat()+"Z" + metrics["time_end"] = metrics["time_end"].isoformat()+"Z" + try: + #Read input data + data = {} + if os.path.exists(output): + osaka.utils.LOGGER.info("Loaded metadata from: {0}".format(output)) + with open(output,"r") as inputf: + data = json.load(inputf) + #Add metric + data.setdefault(metrics['type'], []).append(metrics) + #Write out the file + osaka.utils.LOGGER.info("Writing metric to: {0}".format(output)) + with open(output,"w") as outputf: + json.dump(data, outputf) + except Exception, e: + osaka.utils.LOGGER.warning("Error merging metrics with: {0} Error: {1}".format(output,str(e))) + raise e + diff --git a/osaka/utils.py b/osaka/utils.py new file mode 100644 index 0000000..59625e3 --- /dev/null +++ b/osaka/utils.py @@ -0,0 +1,90 @@ +import logging +import subprocess +import urlparse + +LOGGER = logging.getLogger("osaka") + +DU_CALC = { + "GB": 1024**3, + "MB": 1024**2, + "KB": 1024 +} +def get_uri_username_and_password(uri): + ''' + Parses the URI and returns the username and password + @param uri: URI for the end point + @return: tuple containing the username/password from the URI + ''' + temp = urlparse.urlparse(uri) + return (temp.username,temp.password) +def get_uri_scheme_and_hostname(uri): + ''' + Parses the URI and returns the scheme and hostname + @param uri: URI for the end point + @return: tuple containing the scheme/hostname from the URI + ''' + temp = urlparse.urlparse(uri) + host = temp.hostname + ("" if temp.port is None else ":"+str(temp.port)) + return (temp.scheme,host) +def get_uri_path(uri): + ''' + Gets the path from the uri + @param uri: uri from which to parse path + @return: path portion of URI + ''' + temp = urlparse.urlparse(uri) + return temp.path +def get_container_and_path(urlpath): + ''' + Gets the container and path from the given url + @param urlpath - url's path to determine container and path from + ''' + split = urlpath.lstrip("/").split("/",1) + return (split[0],"" if not len(split) > 1 else split[1]) + +#def walk(func, directory,destdir, *params): +# ''' +# Walk the directory and call the function for each file +# @param func - function to call back +# @param directory - directory to walk +# @param params - params to pass through +# ''' +# ret = True +# for dir,unused,files in os.walk(directory): +# for file in files: +# full = os.path.join(dir,file) +# dest = os.path.join(destdir,os.path.relpath(full,directory)) +# ret = ret and func(full,dest,*params) +# return ret + +def get_disk_usage(path): + """Return disk size, "du -sk", for a path.""" + return int(subprocess.check_output(['du', '-sk', path]).split()[0]) * DU_CALC['KB'] +def human_size(size): + """Return the human size""" + for tmp in ["GB", "MB", "KB"]: + if size > DU_CALC[tmp]: + return (size/float(DU_CALC[tmp]), tmp) + return (size, "B") + +def product_composite_iterator(base, handle, callback, include_top=False): + ''' + A function to walk through an osaka product enabling handling of + coposite childeren. Note: if not a composite product, nothing happens. + @param base: base of the product + @param handle: handler for the backend + @param callback: callback taking uri, and relative path + @param include_top: include top of composite + ''' + uris = [base] + if handle.isComposite(base): + uris = handle.listAllChildren(base) + if include_top: + uris.append(base) + return map(callback, uris) + +class OsakaException(Exception): pass +class CooperationNotPossibleException(OsakaException): pass +class CooperationRefusedException(OsakaException): pass +class TimeoutException(OsakaException): pass +class NoClobberException(OsakaException): pass diff --git a/resources/objects/test-directory/file-1 b/resources/objects/test-directory/file-1 new file mode 100644 index 0000000..d3cdc18 --- /dev/null +++ b/resources/objects/test-directory/file-1 @@ -0,0 +1 @@ +This is a file at top-level. It represents files. diff --git a/resources/objects/test-directory/file-2 b/resources/objects/test-directory/file-2 new file mode 100644 index 0000000..6097b79 --- /dev/null +++ b/resources/objects/test-directory/file-2 @@ -0,0 +1 @@ +This is another file at top-level. diff --git a/resources/objects/test-directory/sub-1/sub-1-file-1 b/resources/objects/test-directory/sub-1/sub-1-file-1 new file mode 100644 index 0000000..3559735 --- /dev/null +++ b/resources/objects/test-directory/sub-1/sub-1-file-1 @@ -0,0 +1 @@ +This is the first file in the sub-1 directory. diff --git a/resources/objects/test-directory/sub-1/sub-1-file-2 b/resources/objects/test-directory/sub-1/sub-1-file-2 new file mode 100644 index 0000000..81b8bb7 --- /dev/null +++ b/resources/objects/test-directory/sub-1/sub-1-file-2 @@ -0,0 +1 @@ +This is another file in the sub-1 directory. diff --git a/resources/objects/test-directory/sub-2/sub-2-file-1 b/resources/objects/test-directory/sub-2/sub-2-file-1 new file mode 100644 index 0000000..9dc949b --- /dev/null +++ b/resources/objects/test-directory/sub-2/sub-2-file-1 @@ -0,0 +1 @@ +This a a file. It is particularly boring, but is located in sub-2. diff --git a/resources/objects/test-file b/resources/objects/test-file new file mode 100644 index 0000000..37e542d --- /dev/null +++ b/resources/objects/test-file @@ -0,0 +1 @@ +This is a file designed for the single-file test, as Osaka allows single files in addition to directories. diff --git a/resources/objects/test-product/dumby-product-20160830161546.browse.png b/resources/objects/test-product/dumby-product-20160830161546.browse.png new file mode 100644 index 0000000..f84a40c Binary files /dev/null and b/resources/objects/test-product/dumby-product-20160830161546.browse.png differ diff --git a/resources/objects/test-product/dumby-product-20160830161546.browse_small.png b/resources/objects/test-product/dumby-product-20160830161546.browse_small.png new file mode 100644 index 0000000..db7e5df Binary files /dev/null and b/resources/objects/test-product/dumby-product-20160830161546.browse_small.png differ diff --git a/resources/objects/test-product/dumby-product-20160830161546.dat b/resources/objects/test-product/dumby-product-20160830161546.dat new file mode 100644 index 0000000..33b23c2 Binary files /dev/null and b/resources/objects/test-product/dumby-product-20160830161546.dat differ diff --git a/resources/objects/test-product/dumby-product-20160830161546.met.json b/resources/objects/test-product/dumby-product-20160830161546.met.json new file mode 100644 index 0000000..172e519 --- /dev/null +++ b/resources/objects/test-product/dumby-product-20160830161546.met.json @@ -0,0 +1,41 @@ +{ + "center": { + "type": "point", + "coordinates": [ + 0, + 0 + ] + }, + "prod_file": "dumby-product-20160830161546/dumby-product-20160830161546.dat", + "sleep": 5, + "location": { + "type": "polygon", + "coordinates": [ + [ + [ + -179.9, + -89.9 + ], + [ + -179.9, + 89.9 + ], + [ + 179.9, + 89.9 + ], + [ + 179.9, + -89.9 + ], + [ + -179.9, + -89.9 + ] + ] + ] + }, + "starttime": "2016-08-30T16:15:47.091583", + "prod_dir": "dumby-product-20160830161546", + "endtime": "2016-08-30T16:15:47.091659" +} \ No newline at end of file diff --git a/resources/objects/testbig-evil/file-1 b/resources/objects/testbig-evil/file-1 new file mode 100644 index 0000000..d3cdc18 --- /dev/null +++ b/resources/objects/testbig-evil/file-1 @@ -0,0 +1 @@ +This is a file at top-level. It represents files. diff --git a/resources/objects/testbig-evil/file-2 b/resources/objects/testbig-evil/file-2 new file mode 100644 index 0000000..6097b79 --- /dev/null +++ b/resources/objects/testbig-evil/file-2 @@ -0,0 +1 @@ +This is another file at top-level. diff --git a/resources/objects/testbig-evil/sub-1/sub-1-file-1 b/resources/objects/testbig-evil/sub-1/sub-1-file-1 new file mode 100644 index 0000000..3559735 --- /dev/null +++ b/resources/objects/testbig-evil/sub-1/sub-1-file-1 @@ -0,0 +1 @@ +This is the first file in the sub-1 directory. diff --git a/resources/objects/testbig-evil/sub-1/sub-1-file-2 b/resources/objects/testbig-evil/sub-1/sub-1-file-2 new file mode 100644 index 0000000..81b8bb7 --- /dev/null +++ b/resources/objects/testbig-evil/sub-1/sub-1-file-2 @@ -0,0 +1 @@ +This is another file in the sub-1 directory. diff --git a/resources/objects/testbig-evil/sub-2/sub-2-file-1 b/resources/objects/testbig-evil/sub-2/sub-2-file-1 new file mode 100644 index 0000000..9dc949b --- /dev/null +++ b/resources/objects/testbig-evil/sub-2/sub-2-file-1 @@ -0,0 +1 @@ +This a a file. It is particularly boring, but is located in sub-2. diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..bebc916 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +from setuptools import setup, find_packages +import osaka + +setup( + name='osaka', + version=osaka.__version__, + long_description=osaka.__description__, + url=osaka.__url__, + packages=find_packages(), + include_package_data=True, + zip_safe=False, + install_requires=[ + 'requests>=2.7.0', 'easywebdav==1.2.0', 'fabric==1.10.1', + 'filechunkio==1.6.0','azure-storage==0.20.0', 'boto3>=1.2.6', + 'protobuf==3.1.0.post1', 'google-cloud-storage>=0.22.0' + ], + entry_points={ + 'console_scripts': [ + 'osaka = osaka.__main__:main' + ] + } +)