diff --git a/all-tests.sh b/all-tests.sh new file mode 100755 index 0000000..db0dee2 --- /dev/null +++ b/all-tests.sh @@ -0,0 +1,6 @@ +#! /bin/bash + +set -xue + +pycodestyle +python tests diff --git a/requirements.txt b/requirements.txt index 53fc4f8..7673c53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,4 @@ ruffus ruamel_yaml pytest pysam -paramiko -gevent -apsw +tqdm diff --git a/src/daisy/storage.py b/src/daisy/storage.py index d990cc0..0839d56 100644 --- a/src/daisy/storage.py +++ b/src/daisy/storage.py @@ -13,18 +13,21 @@ """ +import multiprocessing +from multiprocessing.util import Finalize import os import sys import json +import re +import glob import datetime -import collections +import pathlib import pandas import pandas.io.sql -import re -import glob +import tqdm import sqlalchemy -from sqlalchemy.engine import reflection +from sqlalchemy import inspect from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.automap import automap_base from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, text, \ @@ -36,9 +39,11 @@ import cgatcore.pipeline as P import cgatcore.iotools as IOTools +import cgatcore.experiment as E from daisy.toolkit import touch, read_data, hash -import cgatcore.experiment as E +from daisy.table_cache import TableCache + ###################################################### # Database schema @@ -55,10 +60,6 @@ JSONType = String -RESERVED_WORDS = { - "all": "total"} - - class BenchmarkRun(Base): __tablename__ = 'run' @@ -153,173 +154,11 @@ def create_database(engine): # create_all only creates table that are not present # so it is save to call repeatedly. Base.metadata.create_all(engine) - - Session = sessionmaker(bind=engine) - session = Session() + session = sessionmaker(bind=engine)() session.commit() -@E.cached_function -def get_columns(tablename, engine): - """return list of column names in table""" - metadata = sqlalchemy.MetaData(engine) - tb = sqlalchemy.Table(tablename, metadata, autoload=True, - autoload_with=engine) - return [x.name for x in tb.columns] - - -def sql_sanitize_columns(columns): - - # special chars - columns = [re.sub("[\[\]().,:]", "_", str(x)) for x in columns] - - columns = [re.sub("%", "percent", x) - for x in columns] - - columns = [x.lower() for x in columns] - - columns = [RESERVED_WORDS.get(x, x) for x in columns] - return columns - - -def add_columns_to_table(columns, table, tablename, engine): - - pandas_engine = pandas.io.sql.SQLDatabase(engine) - pandas_table = pandas.io.sql.SQLTable( - tablename, - pandas_engine, - frame=table) - - new_columns = set(columns) - logger = P.get_logger() - - for column in pandas_table.table.columns: - if column.name in new_columns: - statement = "ALTER TABLE {} ADD COLUMN {} {}".format( - tablename, - column.name, - column.type) - - logger.debug("SQL: {}".format(statement)) - engine.execute(statement) - - -def save_table(table, engine, tablename, schema=None, - is_sqlite3=False, - dtypes=None, - indices=["instance_id"]): - logger = P.get_logger() - table.columns = sql_sanitize_columns(table.columns) - - # pandas/sqlite3 prefers the raw connection, otherwise error: - # AttributeError: 'Engine' object has no attribute 'rollback' - if is_sqlite3: - _engine = engine.raw_connection() - # In pandas >= 0.23 and using sqlite as a backend, the - # pandas.DataFrame.to_sql command fails with "OperationalError: - # (sqlite3.OperationalError) too many SQL variables". The reason is a - # fixed limit in sqlite, SQLITE_MAX_VARIABLE_NUMBER, which is by - # default set to 999. - sql_chunk_size = 999 // (len(table.columns) + 1) - else: - _engine = engine - sql_chunk_size = None - - # lower case all table names. Otherwise issues with psql - # mixed case access - tablename = tablename.lower() - create_index = False - - if schema is not None: - # for postgres tables within a schema, engine.has_table does - # not work. A solution would be to set the search_path within - # a connection. The code below fails if there is a column - # mismatch. - # TODO: investigate how to use a connection within pandas. - try: - # table is new, create index - table.to_sql(tablename, - engine, - schema=schema, - if_exists="fail", - index=False, - dtype=dtypes, - chunksize=sql_chunk_size) - create_index = True - except ValueError: - table.to_sql(tablename, - engine, - schema=schema, - if_exists="append", - index=False, - dtype=dtypes, - chunksize=sql_chunk_size) - - elif engine.has_table(tablename, schema): - - existing_columns = set(get_columns(tablename, engine)) - proposed_columns = set(table.columns) - - obsolete_columns = existing_columns.difference(proposed_columns) - if obsolete_columns: - logger.warn("the following columns are obsolete in {}: {}. " - "empty data will be inserted" - .format(tablename, ", ".join(obsolete_columns))) - # create empty columns - for column in obsolete_columns: - table[column] = None - - new_columns = proposed_columns.difference(existing_columns) - if new_columns: - logger.warn("new columns found for {}: the following columns " - "will be added: {} ".format( - tablename, - ", ".join(new_columns))) - - add_columns_to_table(new_columns, - table, - tablename, - engine) - # clear cache of memoization function - get_columns.delete(tablename, engine) - - # append - table.to_sql(tablename, - engine, - schema=schema, - if_exists="append", - index=False, - dtype=dtypes, - chunksize=sql_chunk_size) - else: - # table is new, create index - table.to_sql(tablename, - engine, - schema=schema, - if_exists="fail", - index=False, - dtype=dtypes, - chunksize=sql_chunk_size) - create_index = True - - if create_index: - # sqlite requires an index name - if schema: - tablename = "{}.{}".format(schema, tablename) - - for field in indices: - try: - engine.execute( - text("CREATE INDEX {} ON {} ({})".format( - re.sub("[-.]", "_", tablename) + "_" + field, - tablename, - field))) - except sqlalchemy.exc.ProgrammingError as ex: - logger.warn("could not create index: {}".format(str(ex))) - - -def save_benchmark_timings(path, tablename, engine, instance, schema, - is_sqlite3): +def save_benchmark_timings(path, tablename, table_cache, instance_id: int): fn = os.path.join(path, "benchmark.bench") if not os.path.exists(fn): @@ -327,126 +166,12 @@ def save_benchmark_timings(path, tablename, engine, instance, schema, "file {} does not exist, no tool timings uploaded".format( fn)) else: - try: - tool_bench_data = pandas.read_csv(fn, sep="\t") - except pandas.errors.EmptyDataError: - P.get_logger().warn(f"file {fn} is empty, no tool timings uploaded") - return - tool_bench_data["instance_id"] = instance.id - save_table(tool_bench_data, engine, - tablename, - schema=None, - is_sqlite3=is_sqlite3) - - -class TableCache(): - - # 1 Gb - max_total_bytes = 1e9 - # 50 Mb - max_table_bytes = 5e7 - - def __init__(self, engine, schema, is_sqlite3): - self.engine = engine - self.schema = schema - self.cache = {} - - self.is_sqlite3 = is_sqlite3 - self.total_size = 0 - self.sizes = collections.defaultdict(int) - self.uploaded_sizes = collections.defaultdict(int) - self.dtypes = {} - self.logger = P.get_logger() - self.indices = {} - self.have_created_indices = False - - def flush_table(self, tablename): - - table = self.cache[tablename] - - self.logger.debug("uploading table {}: {} bytes".format( - tablename, - self.sizes[tablename])) - save_table(table, - self.engine, - tablename, - self.schema, - is_sqlite3=self.is_sqlite3, - dtypes=self.dtypes.get(tablename, None)) - del table - del self.cache[tablename] - self.uploaded_sizes[tablename] += self.sizes[tablename] - self.sizes[tablename] = 0 - - def flush_all(self): - for tablename in list(self.cache.keys()): - self.flush_table(tablename) - - self.total_size = 0 - self.cache = {} - self.sizes = collections.defaultdict(int) - - def add_indices(self, indices): - self.indices.update(indices) - - def add_table(self, table, tablename, dtypes=None): - - memory_usage = sum(table.memory_usage(deep=True)) - self.total_size += memory_usage - self.sizes[tablename] += memory_usage - self.dtypes[tablename] = dtypes - - if tablename not in self.cache: - self.cache[tablename] = table - else: - if set(self.cache[tablename].columns) != set(table.columns): - raise ValueError( - "column mismatch for {}: " - "table={}, cache={}".format( - tablename, - self.cache[tablename].columns, - table.columns)) - try: - self.cache[tablename] = pandas.concat( - [self.cache[tablename], - table]) - except AssertionError: - raise - - if self.total_size > self.max_total_bytes: - self.logger.debug( - "force full cache flush, memory usage = {}".format( - self.total_size)) - self.flush_all() - - if self.sizes[tablename] > self.max_table_bytes: - self.logger.debug( - "force cache flush for table {}, memory usage = {}".format( - tablename, - self.sizes[tablename])) - - self.flush_table(tablename) - - def close(self): - self.flush_all() - - if not self.have_created_indices: - for index_name, info in self.indices.items(): - table_name, fields = info - self.logger.debug("creating index {} on {} and fields {}".format( - index_name, table_name, fields)) - try: - self.engine.execute("CREATE INDEX {} ON {} ({})".format( - index_name, table_name, fields)) - except sqlalchemy.exc.OperationalError as ex: - self.logger.warn("could not create index: {}".format(str(ex))) - self.have_created_indices = True - - def __del__(self): - self.close() - - -def transform_table_before_upload(tablename, table, instance, meta_data, table_cache): + table = pandas.read_csv(fn, sep="\t") + table["instance_id"] = instance_id + table_cache.add_table(table, tablename) + + +def transform_table_before_upload(tablename, table, instance_id: int, meta_data, table_cache): dtypes = None @@ -471,7 +196,7 @@ def transform_table_before_upload(tablename, table, instance, meta_data, table_c # upload into a separate table suffixed by instance id if "metric_upload_separate" in meta_data: if tablename in meta_data["metric_upload_separate"]: - tablename = "{}_{}".format(tablename, instance.id) + tablename = "{}_{}".format(tablename, instance_id) # normalize table by factorizing a column and storing its ids # in a separate table @@ -489,7 +214,7 @@ def transform_table_before_upload(tablename, table, instance, meta_data, table_c factor_table = pandas.DataFrame( {column: names, "id": list(range(len(names)))}) - factor_table["instance_id"] = instance.id + factor_table["instance_id"] = instance_id table_cache.add_table(factor_table, tablename + "_factors") # store table as a matrix @@ -525,6 +250,355 @@ def transform_table_before_upload(tablename, table, instance, meta_data, table_c return tablename, table, dtypes +def divine_paths(infile): + """find tool/metric paths for infile""" + + # walk up the path to find "benchmark.info" as it might be + # located on a higher level if the tool output multiple files. + parts = list(pathlib.Path(os.path.dirname(infile)).parts) + info_paths = [] + rootdir = os.getcwd() + while parts: + p = os.path.join(*parts) + if p == rootdir: + break + if os.path.exists(os.path.join(p, "benchmark.info")): + info_paths.append(p) + parts.pop() + info_paths = info_paths[::-1] + + # the level of nesting determines the layout: + # 1 level: aggregation: tool == metric + # 2 levels: tool + metric + # 3 levels: tool + split + metric + if len(info_paths) not in (1, 2, 3): + raise ValueError( + "for {}, expected two or three paths with info, " + "got {}".format(infile, len(info_paths))) + + meta_data = {} + + if len(info_paths) == 1: + tool_dir = metric_dir = info_paths[0] + split_dir = None + elif len(info_paths) == 2: + tool_dir, metric_dir = info_paths + split_dir = None + # If there are multiple output files in aggregation, use + # intermediate paths as split_subset factors. + td = len(tool_dir.split(os.sep)) + tm = len(metric_dir.split(os.sep)) + delta = tm - td + if delta > 1: + meta_data["split_subset"] = re.sub( + ".dir", "", os.sep.join(metric_dir.split(os.sep)[td:-1])) + elif len(info_paths) == 3: + tool_dir, split_dir, metric_dir = info_paths + + if tool_dir: + md = read_data(os.path.join(tool_dir, "benchmark.info"), prefix="tool_") + if "tool_action" in md and md["tool_action"] != "tool": + raise ValueError("action for tool info {} is not 'tool', but '{}'".format( + os.path.join(metric_dir, "benchmark.info"), + md["tool_action"])) + meta_data.update(md) + + if metric_dir: + md = read_data(os.path.join(metric_dir, "benchmark.info"), + prefix="metric_") + if "metric_action" in md: + # ignore splits, they will be added through metrics + if md["metric_action"] == "split": + return None, None, None + if md["metric_action"] != "metric": + return tool_dir, None, None + meta_data.update(md) + + if split_dir: + md = read_data(os.path.join(split_dir, "benchmark.info"), + prefix="split_") + if "split_action" in md and md["split_action"] != "split": + raise ValueError("action for split info {} is not 'split', but '{}'".format( + os.path.join(metric_dir, "benchmark.info"), + md["split_action"])) + + meta_data.update(md) + subset = os.path.basename(os.path.dirname(info_paths[-1])) + if subset.endswith(".dir"): + subset = subset[:-len(".dir")] + meta_data["split_subset"] = subset + + return tool_dir, metric_dir, meta_data + + +def save_metric_data(meta_data, table_cache, schema, instance_id: int, session): + + logger = P.get_logger() + metric_table_filter = None + if "metric_no_upload" in meta_data: + if meta_data["metric_no_upload"] == "*": + logger.warn("upload turned off for metric {}".format( + meta_data["metric_name"])) + return + else: + metric_table_filter = re.compile(meta_data["metric_no_upload"]) + + # multiple tablenames for multiple metric output + # + # Tables are added into schemas to avoid cluttering + # the public namespace. + # (if only blobs, no metric output file) + if "metric_output_files" in meta_data: + assert len(meta_data["metric_output_files"]) == \ + len(meta_data["metric_tablenames"]) + + for output_file, tablename in zip( + meta_data["metric_output_files"], + meta_data["metric_tablenames"]): + + if metric_table_filter and metric_table_filter.search(tablename): + logger.warn("upload for table {} turned off".format( + tablename)) + continue + + if not os.path.exists(output_file): + logger.warning("output file {} does not exist - ignored".format( + output_file)) + continue + + if IOTools.is_empty(output_file): + logger.warn("output file {} is empty - ignored".format( + output_file)) + continue + + # table = pandas.DataFrame({"values": [1, 2]}) + try: + table = pandas.read_csv(output_file, + sep="\t", + comment="#", + skip_blank_lines=True) + except ValueError as e: + logger.warn("table {} can not be read: {}".format( + output_file, str(e))) + continue + except pandas.parser.CParserError as e: + logger.warn("malformatted table {} can not be read: {}".format( + output_file, str(e))) + continue + + if table.empty: + logger.warn("table {} is empty - ignored".format(output_file)) + continue + + tablename, table, dtypes = transform_table_before_upload(tablename, + table, + instance_id, + meta_data, + table_cache) + + if schema is None: + tn = tablename + else: + tn = "{}.{}".format(schema, tablename) + + # add foreign key + table["instance_id"] = instance_id + logger.debug(f"saving data {table.shape} from {output_file} to table {tn} under {instance_id}") + table_cache.add_table(table, tablename, dtypes) + + if "metric_blob_globs" in meta_data: + metric_dir = meta_data["metric_outdir"] + files = [glob.glob(os.path.join(metric_dir, x)) + for x in meta_data["metric_blob_globs"]] + files = IOTools.flatten(files) + logger.debug( + "uploading binary data in {} files from {} to " + "table binary_data".format(len(files), metric_dir)) + table = [] + for fn in files: + with IOTools.open_file(fn, "rb", encoding=None) as inf: + data_row = BenchmarkBinaryData( + instance_id=instance_id, + filename=os.path.basename(fn), + path=fn, + data=inf.read()) + session.add(data_row) + session.commit() + + if meta_data.get("metric_tableindices", None): + table_cache.add_indices(meta_data["metric_tableindices"]) + + +def instantiate_metrics(metrics, session, run_id): + + tool_dirs = set() + for tool_dir, metric_dir, meta_data, mtime in metrics: + + if meta_data is None: + continue + + # tool_input_files can either be a dictionary if a tool + # or a simple list if aggregation. + try: + tool_input_files = [x["path"] for x in meta_data["tool_input_files"]] + except TypeError: + tool_input_files = meta_data["tool_input_files"] + + try: + instance = BenchmarkInstance( + run_id=run_id, + replication_id=meta_data.get("tool_replication_id", 1), + completed=datetime.datetime.fromtimestamp(mtime), + input=",".join(tool_input_files), + input_alias=meta_data["tool_input_alias"], + tool_name=meta_data["tool_name"], + tool_version=meta_data["tool_version"], + tool_options=meta_data["tool_options"], + tool_hash=meta_data["tool_option_hash"], + tool_alias=meta_data.get("tool_alias", ""), + metric_name=meta_data["metric_name"], + metric_version=meta_data["metric_version"], + metric_options=meta_data["metric_options"], + metric_hash=meta_data["metric_option_hash"], + metric_alias=meta_data.get("metric_alias", ""), + split_name=meta_data.get("split_name", ""), + split_version=meta_data.get("split_version", ""), + split_options=meta_data.get("split_options", ""), + split_hash=meta_data.get("split_option_hash", ""), + split_alias=meta_data.get("split_alias", ""), + split_subset=meta_data.get("split_subset", "all"), + meta_data=json.dumps(meta_data)) + except KeyError as e: + raise KeyError("missing required attribute {} in meta_data: {}".format( + str(e), str(meta_data))) + + session.add(instance) + session.commit() + + upload_tool_metrics = True + if tool_dir: + upload_tool_metrics = tool_dir not in tool_dirs + if upload_tool_metrics: + tool_dirs.add(tool_dir) + + assert instance.id is not None + yield(tool_dir, metric_dir, meta_data, instance.id, upload_tool_metrics) + + +def generate_metric(infile): + + tool_dir, metric_dir, meta_data = divine_paths(infile) + + return tool_dir, metric_dir, meta_data, os.path.getmtime(infile) + + +# global variables for multiprocessing +resource = None + + +class Resource(object): + def __init__(self, args): + self.args = args + + def __enter__(self): + table_cache = TableCache(*self.args) + E.debug(f"{os.getpid()}: created resource={id(self)}: cache={id(table_cache)}") + self.table_cache = table_cache + return self + + def __exit__(self, *args, **kwargs): + E.debug(f"{os.getpid()}: resource={id(self)}: final table cache flush for cache={id(self.table_cache)} started") + E.debug(f"{os.getpid()}: resource={id(self)}: cache={id(self.table_cache)}: " + f"{self.table_cache.get_cache_stats()}") + self.table_cache.flush_all() + E.debug(f"{os.getpid()}: resource={id(self)}: final table cache flush " + f"for cache={id(self.table_cache)} completed") + + +def open_resource(args): + return Resource(args) + + +def upload_metric(args): + tool_dir, metric_dir, meta_data, instance_id, upload_tool_metrics = args + + if upload_tool_metrics: + save_benchmark_timings(tool_dir, + "tool_timings", + upload_metric.table_cache, + instance_id) + + save_benchmark_timings(metric_dir, + "metric_timings", + upload_metric.table_cache, + instance_id) + + E.debug(f"{os.getpid()}: adding metrics for instance_id {instance_id} to cache={id(upload_metric.table_cache)}") + save_metric_data(meta_data, + upload_metric.table_cache, + upload_metric.schema, + instance_id, + upload_metric.session) + + +def setup_worker(f, *args): + + global resource + + engine, schema, is_sqlite = args + engine.dispose() + f.schema = schema + Session = sessionmaker(bind=engine) + f.session = Session() + + resource_cm = open_resource(args) + E.debug(f"{os.getpid()}: setting up worker for resource={id(resource)}") + old_resource = resource + resource = resource_cm.__enter__() + E.debug(f"{os.getpid()}: new worker for resource={id(resource)} (old_resource={id(old_resource)})") + + # Register a finalizer to flush table cache + Finalize(resource, resource.__exit__, exitpriority=16) + + E.debug(f"{os.getpid()}: adding cache={id(resource.table_cache)} from resource={id(resource)} " + f"to worker={id(f)}, session={id(f.session)}") + f.table_cache = resource.table_cache + + +def upload_metrics_tables(infiles: list, run_id: int, schema, session, engine, + max_workers: int = 10): + + gis_sqlite3 = True + + logger = P.get_logger() + + logger.info(f"{os.getpid()}: collecting upload items for {len(infiles)} input files") + metric_f = generate_metric + pool = multiprocessing.Pool(max_workers) + metrics = pool.map(metric_f, infiles) + pool.close() + pool.join() + + logger.info(f"{os.getpid()}: instantiating {len(metrics)} metrics") + data = list(tqdm.tqdm(instantiate_metrics(metrics, session, run_id), + total=len(metrics))) + + logger.info(f"{os.getpid()}: uploading {len(data)} items") + upload_f = upload_metric + initargs = (upload_f, engine, schema, is_sqlite3) + if max_workers == 1: + setup_worker(*initargs) + result = list(map(upload_f, data)) + global resource + resource.table_cache.flush_all() + else: + logger.info(f"{os.getpid()}: loading data with {max_workers} cores") + pool = multiprocessing.Pool(max_workers, initializer=setup_worker, initargs=initargs) + pool.map(upload_f, data) + pool.close() + pool.join() + + def upload_result(infiles, outfile, *extras): """upload results into database. @@ -537,6 +611,10 @@ def upload_result(infiles, outfile, *extras): config = {"database": {"url": "sqlite:///./csvdb"}} + To use multiple cores, try:: + + config = {"database": {"url": "sqlite:///./csvdb", "cores": 10}} + Arguments --------- infiles: list @@ -562,6 +640,8 @@ def upload_result(infiles, outfile, *extras): url = config["database"]["url"] is_sqlite3 = url.startswith("sqlite") + max_workers = config["database"].get("cores", 1) + if is_sqlite3: connect_args = {'check_same_thread': False} else: @@ -605,6 +685,9 @@ def upload_result(infiles, outfile, *extras): else: created = datetime.datetime.now() + Session = sessionmaker(bind=engine) + session = Session() + benchmark_run = BenchmarkRun( author=os.environ.get("USER", "unknown"), # needs refactoring, should be: uploaded_at, created_at, run_at @@ -619,264 +702,30 @@ def upload_result(infiles, outfile, *extras): config_hash=hash(json.dumps(config)), status="incomplete") - Session = sessionmaker(bind=engine) - session = Session() session.add(benchmark_run) session.commit() for tag in config["tags"]: - benchmark_tag = BenchmarkTag( - run_id=benchmark_run.id, - tag=tag) + benchmark_tag = BenchmarkTag(run_id=benchmark_run.id, tag=tag) session.add(benchmark_tag) - session.commit() - tool_dirs = set() - - table_cache = TableCache(engine, schema, is_sqlite3) - - for infile in infiles: - - path, name = os.path.split(infile) - - # walk up the path to find "benchmark.info" as it might be - # located on a higher level if the tool output multiple files. - parts = path.split(os.sep) - - info_paths = [] - rootdir = os.getcwd() - while len(parts): - p = os.path.join(*parts) - if p == rootdir: - break - if os.path.exists(os.path.join(p, "benchmark.info")): - info_paths.append(p) - parts.pop() - info_paths = info_paths[::-1] - - # the level of nesting determines the layout: - # 1 level: aggregation: tool == metric - # 2 levels: tool + metric - # 3 levels: tool + split + metric - if len(info_paths) not in (1, 2, 3): - raise ValueError( - "for {}, expected two or three paths with info, " - "got {}".format(infile, len(info_paths))) - - meta_data = {} - - if len(info_paths) == 1: - tool_dir = metric_dir = info_paths[0] - split_dir = None - elif len(info_paths) == 2: - tool_dir, metric_dir = info_paths - split_dir = None - # If there are multiple output files in aggregation, use - # intermediate paths as split_subset factors. - td = len(tool_dir.split(os.sep)) - tm = len(metric_dir.split(os.sep)) - d = tm - td - if d > 1: - meta_data["split_subset"] = re.sub( - ".dir", "", - os.sep.join( - metric_dir.split(os.sep)[td:-1])) - elif len(info_paths) == 3: - tool_dir, split_dir, metric_dir = info_paths - - if tool_dir: - d = read_data(os.path.join(tool_dir, "benchmark.info"), - prefix="tool_") - if "tool_action" in d: - assert d["tool_action"] == "tool" - meta_data.update(d) - - if metric_dir: - d = read_data(os.path.join(metric_dir, "benchmark.info"), - prefix="metric_") - if "metric_action" in d: - # ignore splits, they will be added through metrics - if d["metric_action"] == "split": - continue - assert d["metric_action"] == "metric", \ - "action for metric info {} is not 'metric', but '{}'" \ - .format(os.path.join(metric_dir, "benchmark.info"), - d["metric_action"]) - - meta_data.update(d) - - if split_dir: - d = read_data(os.path.join(split_dir, "benchmark.info"), - prefix="split_") - if "split_action" in d: - assert d["split_action"] == "split" - meta_data.update(d) - subset = os.path.basename( - os.path.dirname(info_paths[-1])) - if subset.endswith(".dir"): - subset = subset[:-len(".dir")] - meta_data["split_subset"] = subset - - # tool_input_files can either be a dictionary if a tool - # or a simple list if aggregation. - try: - tool_input_files = [x["path"] for x in meta_data["tool_input_files"]] - except TypeError: - tool_input_files = meta_data["tool_input_files"] - - try: - instance = BenchmarkInstance( - run_id=benchmark_run.id, - replication_id=meta_data.get("tool_replication_id", 1), - completed=datetime.datetime.fromtimestamp( - os.path.getmtime(infile)), - input=",".join(tool_input_files), - input_alias=meta_data["tool_input_alias"], - tool_name=meta_data["tool_name"], - tool_version=meta_data["tool_version"], - tool_options=meta_data["tool_options"], - tool_hash=meta_data["tool_option_hash"], - tool_alias=meta_data.get("tool_alias", ""), - metric_name=meta_data["metric_name"], - metric_version=meta_data["metric_version"], - metric_options=meta_data["metric_options"], - metric_hash=meta_data["metric_option_hash"], - metric_alias=meta_data.get("metric_alias", ""), - split_name=meta_data.get("split_name", ""), - split_version=meta_data.get("split_version", ""), - split_options=meta_data.get("split_options", ""), - split_hash=meta_data.get("split_option_hash", ""), - split_alias=meta_data.get("split_alias", ""), - split_subset=meta_data.get("split_subset", "all"), - meta_data=json.dumps(meta_data)) - except KeyError as e: - raise KeyError("missing required attribute {} in {}".format( - str(e), str(meta_data))) - - session.add(instance) - session.commit() - - # avoid multiple upload of tool data - if tool_dir and tool_dir not in tool_dirs: - tool_dirs.add(tool_dir) - save_benchmark_timings(tool_dir, - "tool_timings", - engine, instance, schema, - is_sqlite3) - - save_benchmark_timings(metric_dir, - "metric_timings", - engine, instance, schema, - is_sqlite3) - - metric_table_filter = None - if "metric_no_upload" in meta_data: - if meta_data["metric_no_upload"] == "*": - logger.warn("upload turned off for metric {}".format( - meta_data["metric_name"])) - continue - else: - metric_table_filter = re.compile(meta_data["metric_no_upload"]) - - # multiple tablenames for multiple metric output - # - # Tables are added into schemas to avoid cluttering - # the public namespace. - # (if only blobs, no metric output file) - if "metric_output_files" in meta_data: - assert len(meta_data["metric_output_files"]) == \ - len(meta_data["metric_tablenames"]) - - for output_file, tablename in zip( - meta_data["metric_output_files"], - meta_data["metric_tablenames"]): - - if metric_table_filter and metric_table_filter.search(tablename): - logger.warn("upload for table {} turned off".format( - tablename)) - continue - - if not os.path.exists(output_file): - logger.warn("output file {} does not exist - ignored".format( - output_file)) - continue - - if IOTools.is_empty(output_file): - logger.warn("output file {} is empty - ignored".format( - output_file)) - continue - - try: - table = pandas.read_csv(output_file, - sep="\t", - comment="#", - skip_blank_lines=True) - except ValueError as e: - logger.warn("table {} can not be read: {}".format( - output_file, str(e))) - continue - except pandas.parser.CParserError as e: - logger.warn("malformatted table {} can not be read: {}".format( - output_file, str(e))) - continue - - if len(table) == 0: - logger.warn("table {} is empty - ignored".format(output_file)) - continue + session.commit() - tablename, table, dtypes = transform_table_before_upload(tablename, - table, - instance, - meta_data, - table_cache) - - if schema is None: - tn = tablename - else: - tn = "{}.{}".format(schema, tablename) - - logger.debug("saving data from {} to table {}".format(output_file, tn)) - # add foreign key - table["instance_id"] = instance.id - table_cache.add_table(table, tablename, dtypes) - - if "metric_blob_globs" in meta_data: - metric_dir = meta_data["metric_outdir"] - files = [glob.glob(os.path.join(metric_dir, x)) - for x in meta_data["metric_blob_globs"]] - files = IOTools.flatten(files) - logger.debug( - "uploading binary data in {} files from {} to " - "table binary_data".format(len(files), metric_dir)) - table = [] - for fn in files: - with IOTools.open_file(fn, "rb", encoding=None) as inf: - data_row = BenchmarkBinaryData( - instance_id=instance.id, - filename=os.path.basename(fn), - path=fn, - data=inf.read()) - session.add(data_row) - session.commit() - - if meta_data.get("metric_tableindices", None): - table_cache.add_indices(meta_data["metric_tableindices"]) - - table_cache.close() - touch(outfile) + upload_metrics_tables(infiles, + benchmark_run.id, schema, session, engine, + max_workers=max_workers) # upload table sizes - df_sizes = pandas.DataFrame.from_records( - list(table_cache.uploaded_sizes.items()), - columns=["tablename", "bytes_uploaded"]) - df_sizes["bytes_resident"] = df_sizes.bytes_uploaded - df_sizes["run_id"] = benchmark_run.id - df_sizes["schema"] = schema - save_table(df_sizes, - engine, - "metric_storage", - schema=None, - is_sqlite3=is_sqlite3) + # df_sizes = pandas.DataFrame.from_records(list(table_cache.uploaded_sizes.items()), + # columns=["tablename", "bytes_uploaded"]) + # df_sizes["bytes_resident"] = df_sizes.bytes_uploaded + # df_sizes["run_id"] = benchmark_run.id + # df_sizes["schema"] = schema + # save_table(df_sizes, + # engine, + # "metric_storage", + # schema=None, + # is_sqlite3=is_sqlite3) benchmark_run.status = "complete" session.commit() @@ -885,6 +734,7 @@ def upload_result(infiles, outfile, *extras): del engine logger.info("uploaded results under run_id {}".format(benchmark_run.id)) + touch(outfile) def export_result(infile, outfile, *extras): @@ -914,6 +764,8 @@ def get_instance_ids_for_run_id(run_id, engine): def purge_run_id(run_id, url, dry_run=False, schemas=None): """remove a run from a database. """ + + logger = P.get_logger() engine = sqlalchemy.create_engine(url) connection = engine.connect() @@ -924,7 +776,7 @@ def purge_run_id(run_id, url, dry_run=False, schemas=None): base.prepare() if schemas is None: - insp = reflection.Inspector.from_engine(engine) + insp = inspect(engine) schemas = insp.get_schema_names() # note: default sqlite schema is "main" if 'public' in schemas: @@ -932,9 +784,9 @@ def purge_run_id(run_id, url, dry_run=False, schemas=None): if 'information_schema' in schemas: schemas.remove('information_schema') - E.debug("getting instance_id list of run_id={}".format(run_id)) + logger.debug("getting instance_id list of run_id={}".format(run_id)) instance_ids = set(get_instance_ids_for_run_id(run_id, engine)) - E.debug("found {} instances for run_id={}".format(len(instance_ids), run_id)) + logger.debug("found {} instances for run_id={}".format(len(instance_ids), run_id)) non_metric_tables = ['run', 'instance', 'binary_data', @@ -957,10 +809,9 @@ def purge_run_id(run_id, url, dry_run=False, schemas=None): autoload=True) if "instance_id" not in table.c: continue - E.info("deleting data in {}".format(table_name)) + logger.debug("deleting data in {}".format(table_name)) delete = table.delete().where( table.c.instance_id.in_(instance_ids)) - # E.debug(delete) if not dry_run: connection.execute(delete) @@ -969,15 +820,13 @@ def purge_run_id(run_id, url, dry_run=False, schemas=None): table = sqlalchemy.Table(table_name, metadata, autoload=True) if "run_id" not in table.c: continue - E.info("deleting data in {} for run_id {}".format(table_name, run_id)) + logger.info("deleting data in {} for run_id {}".format(table_name, run_id)) delete = table.delete().where(table.c.run_id == run_id) - # E.debug(delete) if not dry_run: connection.execute(delete) table = sqlalchemy.Table('run', metadata, autoload=True) delete = table.delete().where(table.c.id == run_id) - E.info("deleting data in 'run' for id {}".format(run_id)) - # E.debug(delete) + logger.info("deleting data in 'run' for id {}".format(run_id)) if not dry_run: connection.execute(delete) diff --git a/src/daisy/table_cache.py b/src/daisy/table_cache.py new file mode 100644 index 0000000..3469011 --- /dev/null +++ b/src/daisy/table_cache.py @@ -0,0 +1,317 @@ +import os +import re +import collections +import sqlite3 +import time +import pandas +import sqlalchemy +from sqlalchemy import inspect +from sqlalchemy import text +import cgatcore.pipeline as P +import cgatcore.experiment as E + + +RESERVED_WORDS = { + "all": "total"} + + +@E.cached_function +def get_columns(tablename, engine): + """return list of column names in table""" + metadata = sqlalchemy.MetaData(engine) + tb = sqlalchemy.Table(tablename, metadata, autoload=True, + autoload_with=engine) + return [x.name for x in tb.columns] + + +def sql_sanitize_columns(columns): + + # special chars + columns = [re.sub(r"[\[\]().,:]", "_", str(x)) for x in columns] + + columns = [re.sub("%", "percent", x) + for x in columns] + + columns = [x.lower() for x in columns] + + columns = [RESERVED_WORDS.get(x, x) for x in columns] + return columns + + +class TableExistsException(Exception): + pass + + +def retry_table_to_sql(table, *args, **kwargs): + """retry SQL statements retrying when database is locked""" + while True: + try: + table.to_sql(*args, **kwargs) + except (sqlalchemy.exc.OperationalError, sqlite3.OperationalError) as ex: + if "database is locked" in str(ex): + E.debug("database is locked") + time.sleep(1) + continue + elif "already exists" in str(ex): + raise TableExistsException(str(ex)) + raise + except ValueError as ex: + # pandas throws ValueError + if "already exists" in str(ex): + raise TableExistsException(str(ex)) + raise + except Exception as ex: + raise + break + + +def retry_sql_execute(engine, statement): + while True: + try: + engine.execute(statement) + except (sqlalchemy.exc.OperationalError, sqlite3.OperationalError) as ex: + if "database is locked" in str(ex): + E.debug("database is locked") + time.sleep(1) + continue + else: + raise + except Exception as ex: + raise + + +def add_columns_to_table(columns, table, tablename, engine): + + pandas_engine = pandas.io.sql.SQLDatabase(engine) + pandas_table = pandas.io.sql.SQLTable(tablename, + pandas_engine, + frame=table) + + new_columns = set(columns) + logger = P.get_logger() + + for column in pandas_table.table.columns: + if column.name in new_columns: + statement = "ALTER TABLE {} ADD COLUMN {} {}".format( + tablename, + column.name, + column.type) + + logger.debug("SQL: {}".format(statement)) + try: + retry_sql_execute(engine, statement) + except (sqlalchemy.exc.OperationalError, sqlite3.OperationalError) as ex: + if "duplicate column name" not in str(ex): + raise + + +def reconcile_columns(tablename, engine, table): + + logger = P.get_logger() + existing_columns = set(get_columns(tablename, engine)) + proposed_columns = set(table.columns) + + obsolete_columns = existing_columns.difference(proposed_columns) + if obsolete_columns: + logger.warn("the following columns are obsolete in {}: {}. " + "empty data will be inserted" + .format(tablename, ", ".join(obsolete_columns))) + # create empty columns + for column in obsolete_columns: + table[column] = None + + new_columns = proposed_columns.difference(existing_columns) + if new_columns: + logger.warn("new columns found for {}: the following columns " + "will be added: {} ".format( + tablename, + ", ".join(new_columns))) + + add_columns_to_table(new_columns, + table, + tablename, + engine) + # clear cache of memoization function + get_columns.delete(tablename, engine) + + +def save_table(table: pandas.DataFrame, + engine, + tablename, + schema: str = None, + is_sqlite3: bool = False, + dtypes=None, + indices=["instance_id"]): + logger = P.get_logger() + table.columns = sql_sanitize_columns(table.columns) + + # pandas/sqlite3 prefers the raw connection, otherwise error: + # AttributeError: 'Engine' object has no attribute 'rollback' + if is_sqlite3: + _engine = engine.raw_connection() + # In pandas >= 0.23 and using sqlite as a backend, the + # pandas.DataFrame.to_sql command fails with "OperationalError: + # (sqlite3.OperationalError) too many SQL variables". The reason is a + # fixed limit in sqlite, SQLITE_MAX_VARIABLE_NUMBER, which is by + # default set to 999. + sql_chunk_size = 999 // (len(table.columns) + 1) + else: + _engine = engine + sql_chunk_size = None + + # lower case all table names. Otherwise issues with psql + # mixed case access + tablename = tablename.lower() + create_index = False + + try: + retry_table_to_sql(table, + tablename, + _engine, + schema=schema, + if_exists="fail", + index=False, + dtype=dtypes, + chunksize=sql_chunk_size) + E.debug(f"table {tablename} was new") + create_index = True + except TableExistsException: + E.debug(f"table {tablename} already exists - appending") + + if create_index: + # sqlite requires an index name + if schema: + tablename = "{}.{}".format(schema, tablename) + + for field in indices: + E.debug(f"creating index on {field} for {tablename}") + try: + _engine.execute( + text("CREATE INDEX {} ON {} ({})".format( + re.sub("[-.]", "_", tablename) + "_" + field, + tablename, + field))) + except TypeError as ex: + logger.warn("could not create index: {}".format(str(ex))) + except sqlalchemy.exc.ProgrammingError as ex: + logger.warn("could not create index: {}".format(str(ex))) + else: + reconcile_columns(tablename, engine, table) + retry_table_to_sql(table, + tablename, + _engine, + schema=schema, + if_exists="append", + index=False, + dtype=dtypes, + chunksize=sql_chunk_size) + + +class TableCache(): + + # 1 Gb + max_total_bytes = 1e9 + # 50 Mb + max_table_bytes = 5e7 + + def __init__(self, engine, schema, is_sqlite3): + self.engine = engine + self.schema = schema + self.cache = {} + + self.is_sqlite3 = is_sqlite3 + self.total_size = 0 + self.sizes = collections.defaultdict(int) + self.uploaded_sizes = collections.defaultdict(int) + self.dtypes = {} + self.logger = P.get_logger() + self.indices = {} + self.have_created_indices = False + + def flush_table(self, tablename): + + table = self.cache[tablename] + + self.logger.debug(f"{os.getpid()}: uploading table {tablename} from cache={id(self)} " + f"to engine={id(self.engine)}: {self.sizes[tablename]} bytes") + save_table(table, + self.engine, + tablename, + self.schema, + is_sqlite3=self.is_sqlite3, + dtypes=self.dtypes.get(tablename, None)) + del table + del self.cache[tablename] + self.uploaded_sizes[tablename] += self.sizes[tablename] + self.sizes[tablename] = 0 + + def flush_all(self): + for tablename in list(self.cache.keys()): + self.flush_table(tablename) + + self.total_size = 0 + self.cache = {} + self.sizes = collections.defaultdict(int) + + def get_cache_stats(self): + stats = {} + for tablename in list(self.cache.keys()): + stats[tablename] = { + "rows": self.cache[tablename].shape[0], + "columns": self.cache[tablename].shape[1] + } + return stats + + def add_indices(self, indices): + self.indices.update(indices) + + def add_table(self, table, tablename, dtypes=None): + + memory_usage = sum(table.memory_usage(deep=True)) + self.total_size += memory_usage + self.sizes[tablename] += memory_usage + self.dtypes[tablename] = dtypes + + if tablename not in self.cache: + self.cache[tablename] = table + else: + new_columns = set(table.columns) - set(self.cache[tablename].columns) + if new_columns: + self.logger.warning(f"additional columns for {tablename}: {new_columns}") + try: + self.cache[tablename] = pandas.concat([self.cache[tablename], table]) + except AssertionError: + raise + + if self.total_size > self.max_total_bytes: + self.logger.debug( + "force full cache flush, memory usage = {}".format( + self.total_size)) + self.flush_all() + + if self.sizes[tablename] > self.max_table_bytes: + self.logger.debug( + "force cache flush for table {}, memory usage = {}".format( + tablename, + self.sizes[tablename])) + + self.flush_table(tablename) + + def close(self): + self.flush_all() + + if not self.have_created_indices: + for index_name, info in self.indices.items(): + table_name, fields = info + self.logger.debug("creating index {} on {} and fields {}".format( + index_name, table_name, fields)) + try: + self.engine.execute("CREATE INDEX {} ON {} ({})".format( + index_name, table_name, fields)) + except sqlalchemy.exc.OperationalError as ex: + self.logger.warn("could not create index: {}".format(str(ex))) + self.have_created_indices = True + + def __del__(self): + E.debug(f"closing table cache {id(self)}") + self.close() diff --git a/src/daisy/tools/benchmark_upload.py b/src/daisy/tools/benchmark_upload.py index b835520..421cd51 100644 --- a/src/daisy/tools/benchmark_upload.py +++ b/src/daisy/tools/benchmark_upload.py @@ -16,37 +16,68 @@ import os import cgatcore.experiment as E import cgatcore.pipeline as P - +import cgatcore.iotools as IOTools from daisy.storage import upload_result def main(argv): - options, args = P.parse_commandline(argv) + def _add_input(parser): + parser.add_option("--data-dir", default=".") + parser.add_option("--force", default=False, action="store_true") + parser.add_option("--min-depth", default=0, type="int") + parser.add_option("--follow-links", default=False, action="store_true") + parser.add_option("--limit-metrics", default=0, type="int") + parser.add_option("--output-filename-metrics") + parser.add_option("--input-filename-metrics") + + P.initialize(argv, callback=_add_input) + options = E.get_args() if options.config_file: PARAMS = P.get_parameters(options.config_file) else: - sys.exit(P.main(options, args)) + sys.exit(P.main(options)) if os.path.exists("results.commit"): - raise ValueError( - "a results.commit file already exists. Please remove " - "before uploading.") - - E.info("collecting files to upload") - infiles = [] - for root, dirs, files in os.walk("."): - # ignore first level (tools) - if root.count(os.sep) == 1: - continue - if "benchmark.info" in files: - infiles.append(os.path.join(root, "benchmark.info")) + if not options.force: + raise ValueError( + "a results.commit file already exists. Please remove " + "before uploading.") + + data_dir = os.path.abspath(options.data_dir) + if options.input_filename_metrics: + with IOTools.open_file(options.input_filename_metrics) as inf: + infiles = [x.strip() for x in inf.readlines() if x.strip()] + if options.limit_metrics: + infiles = infiles[:options.limit_metrics] + else: + E.info(f"collecting files to upload starting in {data_dir}") + infiles = [] + for root, dirs, files in os.walk(data_dir, followlinks=options.follow_links): + E.debug(f"working on {root}: dirs={len(dirs)}, files={len(files)}") + # ignore first level (tools) (needs better check) + depth = root[len(data_dir):].count(os.sep) + if "benchmark.info" in files: + if depth <= options.min_depth: + E.info(f"skipping - depth not high enough: {depth}") + else: + infiles.append(os.path.join(root, "benchmark.info")) + + if options.limit_metrics and len(infiles) > options.limit_metrics: + E.info(f"stopping collection as {len(infiles)} reached") + break E.info("found a potential {} benchmark.info files to upload".format(len(infiles))) + if options.output_filename_metrics: + with IOTools.open_file(options.output_filename_metrics, "w") as outf: + outf.write("\n".join(infiles) + "\n") # find all files of interest + oldwd = os.getcwd() + os.chdir(data_dir) upload_result(infiles, "results.commit", PARAMS) + os.chdir(oldwd) E.stop() diff --git a/src/tests/test_benchmark_simple.py b/src/tests/test_benchmark_simple.py index 8c8544d..3f19347 100644 --- a/src/tests/test_benchmark_simple.py +++ b/src/tests/test_benchmark_simple.py @@ -99,9 +99,9 @@ def test_workflow_show(self): "--config-file={} " "--work-dir={} " "--local " - "show all".format( + "show all".format( os.path.join(self.work_dir, "output.log"), - os.path.join(os.path.dirname(__file__),self.filename), + os.path.join(os.path.dirname(__file__), self.filename), self.work_dir)) p = E.run(statement, return_popen=True) diff --git a/src/tests/test_storage.py b/src/tests/test_storage.py new file mode 100644 index 0000000..e82df49 --- /dev/null +++ b/src/tests/test_storage.py @@ -0,0 +1,96 @@ +import os +import json +import pandas +import pytest +import sqlalchemy +from daisy.storage import upload_result + + +@pytest.fixture +def benchmark_layout(tmp_path): + + ntools = 2 + nmetrics = 4000 + + tools = [f"tool{x}" for x in range(ntools)] + metrics = [f"metric{x}" for x in range(nmetrics)] + + def generate_bench(d): + with open(os.path.join(d, "benchmark.bench"), "w") as outf: + outf.write("statement\nmanual\n") + + def generate_info(d, name): + with open(os.path.join(d, "benchmark.info"), "w") as outf: + outf.write(json.dumps({ + "input_files": [], + "version": "test_version", + "options": "test_options", + "option_hash": "test_option_hash", + "name": name, + "input_alias": "test_alias", + "output_files": [os.path.join(d, f"{name}.tsv")], + "tablenames": [f"{name}"], + })) + + outfiles = [] + + for tool in tools: + tool_dir = os.path.join(tmp_path, f"{tool}.dir") + os.makedirs(tool_dir) + generate_bench(tool_dir) + generate_info(tool_dir, name=tool) + for metric in metrics: + metric_dir = os.path.join(tool_dir, f"{metric}.dir") + os.makedirs(metric_dir) + generate_bench(metric_dir) + generate_info(metric_dir, name=metric) + metric_filename = os.path.join(metric_dir, f"{metric}.tsv") + with open(metric_filename, "w") as outf: + outf.write("metric_name\tmetric_value\n") + outf.write(f"{metric}\t1\n") + outfiles.append(metric_filename) + return outfiles, tools, metrics + + +@pytest.mark.parametrize("max_workers", [1, 5]) +def test_upload(benchmark_layout, tmp_path, max_workers): + + outfiles, tools, metrics = benchmark_layout + db_path = f"sqlite:///{tmp_path}/csvdb" + upload_result(outfiles, + os.path.join(tmp_path, "upload.log"), + {"title": "test", + "description": "test", + "tags": [], + "database": {"url": db_path}}, + max_workers=max_workers) + + db_engine = sqlalchemy.create_engine(db_path) + + insp = sqlalchemy.inspect(db_engine) + assert insp.get_table_names() == sorted([ + 'binary_data', 'instance', + 'metric_storage', 'metric_timings', 'run', + 'tags', 'tool_timings'] + metrics) + + with db_engine.connect() as conn: + run_df = pandas.read_sql("SELECT * FROM run", conn) + assert len(run_df) == 1 + + with db_engine.connect() as conn: + ins_df = pandas.read_sql("SELECT * FROM instance", conn) + assert list(ins_df.run_id.unique()) == [1] + assert list(ins_df.id) == list(range(1, len(ins_df) + 1)) + + for metric in metrics: + with db_engine.connect() as conn: + df = pandas.read_sql(f"SELECT * FROM {metric}", conn) + assert len(df) == len(tools) + + with db_engine.connect() as conn: + tt_df = pandas.read_sql("SELECT * FROM tool_timings", conn) + assert len(tt_df) == len(tools) + + with db_engine.connect() as conn: + mt_df = pandas.read_sql("SELECT * FROM metric_timings", conn) + assert len(mt_df) == len(metrics) * len(tools)