forked from GoogleCloudDataproc/initialization-actions
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_sqoop.py
86 lines (70 loc) · 2.98 KB
/
test_sqoop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
from absl.testing import absltest
from absl.testing import parameterized
from integration_tests.dataproc_test_case import DataprocTestCase
class SqoopTestCase(DataprocTestCase):
COMPONENT = "sqoop"
INIT_ACTIONS = ["sqoop/sqoop.sh"]
TEST_DB_PATH = "test_sql_db_dump.gz"
TEST_SQOOP_SQL = "sqoop_sql.sh"
def verify_instance(self, name):
self.assert_instance_command(name, "sqoop version")
def verify_importing_to_hdfs(self, name):
self.assert_instance_command(
name, "echo -n 'root-password' >/tmp/mysql.password")
self.assert_instance_command(
name, "sqoop import --connect jdbc:mysql://localhost:3306/employees"
" --username root --password-file file:///tmp/mysql.password"
" --table employees --target-dir hdfs:///employees/ --m 1")
_, imported_records, _ = self.assert_instance_command(
name, "hadoop fs -cat hdfs:///employees/part-m-* | wc -l")
self.assertTrue(
int(imported_records) == 300024,
"Unexpected number of imported DB records: wanted 300024, got {}"
.format(imported_records))
def verify_mysql_auth(self, instance):
self.upload_test_file(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), self.TEST_SQOOP_SQL),
instance)
self.assert_instance_command(
instance, "bash {}".format(self.TEST_SQOOP_SQL)
)
self.remove_test_script(self.TEST_SQOOP_SQL, instance)
def verify_import(self, instance):
self.upload_test_file(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), self.TEST_DB_PATH),
instance)
self.assert_instance_command(
instance,
"zcat {} | mysql -u root -proot-password".format(self.TEST_DB_PATH))
self.remove_test_script(self.TEST_DB_PATH, instance)
def import_mysql_db(self, instance):
self.verify_mysql_auth(instance)
self.verify_import(instance)
@parameterized.parameters(
("SINGLE", ["m"]),
("STANDARD", ["m"]),
("HA", ["m-0", "m-1", "m-2"]),
("KERBEROS", ["m"]),
)
def test_sqoop(self, configuration, machine_suffixes):
if self.getImageOs() == 'rocky':
self.skipTest("Not supported in Rocky Linux-based images")
self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
self.verify_instance("{}-{}".format(self.getClusterName(),
machine_suffix))
@parameterized.parameters(("SINGLE", ["m"]),)
def test_sqoop_import_from_local_mysql_to_hdfs(self, configuration,
machine_suffixes):
if self.getImageOs() == 'rocky':
self.skipTest("Not supported in Rocky Linux-based images")
self.createCluster(configuration, self.INIT_ACTIONS)
for machine_suffix in machine_suffixes:
instance = "{}-{}".format(self.getClusterName(), machine_suffix)
self.import_mysql_db(instance)
self.verify_importing_to_hdfs(instance)
if __name__ == "__main__":
absltest.main()