Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

HAWQ 1078. Implement hawqsync-falcon DR utility. #940

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
296 changes: 296 additions & 0 deletions tools/bin/hawqsync-extract
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import sys
from optparse import Option, OptionParser
from subprocess import Popen, PIPE
from hashlib import md5
from json import loads
from time import strftime, sleep, time

def parseargs():
parser = OptionParser(usage="HAWQ extract options.")
parser.add_option('-v', '--verbose', action='store_true', default=False)
parser.add_option("-a", "--prompt", action="store_false",
dest="prompt", default=True,
help="Execute without prompt.")
parser.add_option("-l", "--logdir", dest="logDir",
help="Sets the directory for log files")
parser.add_option('-t', '--testMode', action='store_true', default=False,
dest='testMode', help="Execute in test mode")
parser.add_option('-d', '--destinationOnHdfs', action='store_true',
default="/hawq_default", dest='hdfsDir',
help="HDFS directory to copy resulting tarball to")
parser.add_option('-s', '--schemas', dest='schemas', default="",
help="A comma separated list of the schemas containing "+\
"the tables to extract metadata for")
parser.add_option('-h', dest='hawqUri', default="localhost:5432/gpadmin",
help="The HAWQ master URI. e.g. localhost:5432/gpadmin")


def getSchemaObjectList(hawqUri, schemaList=[], isTesting=False):
"""Utility function to generate a list of table objects for a list of schemas
Args:
hawqUri (str): the HAWQ master URI to use for connecting
schemaList (List): a list of schemas to find tables within
isTesting (bool): NOOP mode bypassing actual REST calls for testing

Returns:
retVal (int): Zero for success, nonzero otherwise
objectList (str) or message (str): either a list of objects or stderr output
"""

retVal = -1

hawqHost = hawqUri.split(":")[0]
hawqPort, hawqDatabase = hawqUri.split(":")[-1].split("/")

objectListSql = """
SELECT n.nspname || '.' || c.relname
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','s','')
AND c.relstorage IN ('h', 'a', 'c', 'p','')
AND n.nspname !~ '^pg_toast'
AND n.nspname ~ '^({schemas})$'
ORDER BY n.nspname, c.relname;
\q
""".format(schemas="|".join(schemaList))

psqlCommand = "psql -h {h} -p {p} -At {d}".format(h=hawqHost,
p=hawqPort,
d=hawqDatabase)


print psqlCommand

objectList = None
stderr = None
if not isTesting:
psqlProcess = Popen(psqlCommand.split(), stdin=PIPE,
stdout=PIPE, stderr=PIPE)

(objectList, stderr) = psqlProcess.communicate(objectListSql)

retVal = psqlProcess.returncode

if retVal != 0:
return retVal, stderr

# Sample output to follow
else:
objectList = """\
model.rsqauredCube
model.mungingScratchspace
model.postMunging
model.fancyAnalytics
development.testtable1
development.randomtable2
development.someOtherSweetTable
""";

retVal = 0

# sample yields: 342f414e7519f8c6a9eacce94777ba08
return retVal, objectList.split("\n")

def saveMetadataForSchemaObjects(baseDirectory="/tmp", objectList=[], isTesting=False):
"""Utility function to export table metadata in a schema to a set of files
(one per table)
Args:
baseDirectory (str): the base directory to create a tarball of
objectList (List): a list of objects to invoke hawq extract with
isTesting (bool): NOOP mode bypassing actual REST calls for testing

Returns:
retVal (int) or message (str): Zero for success, non-zero otherwise
"""

retVal = -1

mkdirCommand = "mkdir -p {d}/hawqExtract".format(d=baseDirectory)

hawqExtractCommand = "hawq extract {{o}} -o {d}/hawqExtract/{{o}}.yml".format(d=baseDirectory)

stdout = None
stderr = None

if isTesting:
return 0

mkdirProcess = Popen(mkdirCommand.split(), stdout=PIPE, stderr=PIPE)

(stdout, stderr) = mkdirProcess.communicate()

retVal = mkdirProcess.returncode

if retVal != 0:
return retVal, stderr

for obj in objectList:

if len(obj) == 0:
continue

thisObjectCommand = hawqExtractCommand.format(o=obj)

print thisObjectCommand

hawqExtractProcess = Popen(thisObjectCommand.split(),
stdout=PIPE, stderr=PIPE)

(stdout, stderr) = hawqExtractProcess.communicate()

rv = hawqExtractProcess.returncode

retVal |= rv

if rv != 0:
print rv, stderr

return retVal

def createTarball(baseDirectory="/tmp", targetTarball="/tmp/hawqExtract-{t}.tar.bz2", isTesting=False):
"""Utility function to create a tarball of the extracted metadata
Args:
baseDirectory (str): the base directory to create a tarball of
targetTarball (str): the target directory and filename of the tarball
isTesting (bool): NOOP mode bypassing actual REST calls for testing

Returns:
retVal (int) or message (str): Zero for success, negative one otherwise
message (str): message contains status string
checksum (str): the MD5 checksum of the created tarball, if successful,
negative one otherwise
"""

checksum = None

# Example invocation
# tar cjf /tmp/test.tar.bz2 -C /tmp hawqExtract

theTime = strftime("%Y-%m-%d-%H%M")

tarCommand = "tar -cjf {t} -C {c} hawqExtract".format(t=targetTarball.format(t=theTime),
c=baseDirectory)

stdout = None
stderr = None
if isTesting:
return 0, "TESTING BRANCH", -1

try:
tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)

(stdout, stderr) = tarProcess.communicate()

except OSError as e:
return -1, str(e), -1


if tarProcess.returncode != 0:
print "Tarball creation failed : " + stderr
return -1, stderr, -1

md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))

try:
md5Process = Popen(md5Command.split(), stdout=PIPE, stderr=PIPE)

(stdout2, stderr2) = md5Process.communicate()

checksum = stdout2.split()[0].strip()

if md5Process.returncode != 0:
return -1, "md5 checksum creation failed : " + stderr2, -1
else:
return 0, targetTarball.format(t=theTime), checksum

except OSError as e:
return -1, str(e), -1

def copyToHdfs(source, dest, isTesting=False):
"""Utility function to copy a source file
to the destination HDFS directory/file
Args:
source (str): the source file on the local FS
dest (str): the target HDFS directory and filename
isTesting (bool): NOOP mode bypassing actual REST calls for testing

Returns:
retVal (int) or message (str): Zero for success, negative one otherwise
message (str): message contains status string
"""

retVal = -1

hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
d=dest)

stdout = None
stderr = None
if not isTesting:
# Force HDFS commands to run as gpadmin user
env = os.environ.copy()
env['HADOOP_USER_NAME'] = 'gpadmin'
hdfsProcess = Popen(hdfsCommand.split(), env=env, stdout=PIPE, stderr=PIPE)

(stdout, stderr) = hdfsProcess.communicate()

return hdfsProcess.returncode, stderr

else:
return 0, "TESTING"

if __name__ == '__main__':
options, args = parseargs()

#if options.verbose:
# enable_verbose_logging()

# TODO - switch prints to this once using gppylibs
#logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)

if options.prompt:
# TODO - switch to this once using gppylibs
#if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):

# TODO - switch to gppylib-based logging
print "---------------------------------------------------------------"
print ""
print "This tool will extract metadata for every table in the schema list."
print ""
print "You must be gpadmin and have /usr/local/hawq/bin/psql in your PATH"
print ""
print " Would you like to continue?"
print "---------------------------------------------------------------"
answer = raw_input("y or n: ")
if "y" not in answer and "Y" not in answer:
print "Exiting."
sys.exit(1)

retVal, objectList = getSchemaObjectList(options.hawqUri,
schemaList=options.schemas.split(","),
isTesting=False)

retVal = saveMetadataForSchemaObjects(objectList, isTesting=False)

retVal, filenameOrStderr, checksum = createTarball(isTesting=False)

retVal, stderr = copyToHdfs(filenameOrStderr, "/hawq_default", isTesting=False)
Loading