Skip to content

Commit

Permalink
final changes
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianespejo committed Jun 23, 2019
1 parent 64df2a6 commit 86a205e
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 163 deletions.
145 changes: 0 additions & 145 deletions IP_relationships_day.py

This file was deleted.

9 changes: 0 additions & 9 deletions Dummy.py → dummy/Dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ def gen_random_string(size=10, chars=string.ascii_uppercase + string.ascii_lower
return ''.join(random.choice(chars) for _ in range(size))


if __name__ == "__main__":
try:
rows = int(sys.argv[1])
except IndexError:
rows = 40000

gen_random_data(rows)


@task(returns=float)
def my_task(A, result, a, b):
start_task = time.time()
Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 1 addition & 2 deletions earth.py → earth/earth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from classes import GridPoints, GlobalStats
from hecuba import config
from pycompss.api.task import task
from pycompss.api.api import compss_barrier, compss_wait_on
from pycompss.api.api import compss_barrier
from pycompss.api.parameter import *
import os
import sys
Expand Down Expand Up @@ -31,7 +31,6 @@ def get_keys(partition):

@task(d=CONCURRENT)
def interpolate(partition, d, NP, dist):
import time
keyDict = get_keys(partition)
localStats = GlobalStats()
num_t = 4
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from random import randint

from hecuba import config
from data_model import *
from ip_relationships.data_model import *

PUBLIC_SPACE_THRESHOLD = 10

Expand Down Expand Up @@ -160,14 +160,14 @@ def main():
ips_comp_time = end - time3

print("IP relationships in overlapping dates, with %s strategy and %s rows:\n" % (
config.partition_strategy.lower(), nrows + 200000))
config.partition_strategy.lower(), nrows + 200000))
print(" chunk_aggr time: %s seconds.\n" % round(chunk_aggr_time, 4))
print(" get_blacklist time: %s seconds.\n" % round(blacklist_comp_time, 4))
print(" compute_IPs time: %s seconds.\n" % round(ips_comp_time, 4))

with open("/home/bsc31/bsc31310/PartitionerTests/results.txt", "a") as results:
results.write("IP relationships in overlapping dates, with %s strategy and %s rows:\n" % (
config.partition_strategy.lower(), nrows + 200000))
config.partition_strategy.lower(), nrows + 200000))
results.write(" chunk_aggr time: %s seconds.\n" % round(chunk_aggr_time, 4))
results.write(" get_blacklist time: %s seconds.\n" % round(blacklist_comp_time, 4))
results.write(" compute_IPs time: %s seconds.\n\n" % round(ips_comp_time, 4))
Expand Down
Empty file added ip_relationships/__init__.py
Empty file.
File renamed without changes.
1 change: 0 additions & 1 deletion data_model.py → ip_relationships/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ class Relationships(StorageDict):
'''
@TypeSpec dict<<user:int>, relationships:set<int>>
'''

149 changes: 149 additions & 0 deletions partitioners/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import uuid


def init(config_file_path=None):
"""
Function that can be useful when running the application with COMPSs >= 2.0
It is executed at the beginning of the application
"""
pass


def finish():
"""
Function that can be useful when running the application with COMPSs >= 2.0
It is executed at the end of the application
"""
pass


def initWorker(config_file_path=None):
"""
Function that can be useful when running the application with COMPSs >= 2.0
It is executed at the beginning of the application
"""
pass


def finishWorker():
"""
Function that can be useful when running the application with COMPSs >= 2.0
It is executed at the end of the application
"""
pass


def start_task(params):
"""
Initializes, if needed, the global vars for prefetch and batch, and starts the context if batch is activated
Args:
params: a list of objects (Blocks, StorageObjs, strings, ints, ...)
"""
from hecuba import config
import time
for param in params:
uid = None
try:
uid = uuid.UUID(param.__dict__["key"])
except KeyError:
pass
try:
uid = param._storage_id
except AttributeError:
pass

if uid is not None:
try:
prepare = config.session.prepare("""UPDATE hecuba.partitioning
SET start_time = ?
WHERE storage_id = ?""")
config.session.execute(prepare, [time.time(), uid])
except:
pass
break


def end_task(params):
"""
Terminates, if needed, the context (to save all data remaining in the batch) and the prefetch. It also prints
the statistics of the StorageObjs if desired.
Args:
params: a list of objects (Blocks, StorageObjs, strings, ints, ...)
"""
from hecuba import config
import time
for param in params:
uid = None
try:
uid = uuid.UUID(param.__dict__["key"])
except KeyError:
pass
try:
uid = param._storage_id
except AttributeError:
pass

if uid is not None:
try:
prepare = config.session.prepare("""UPDATE hecuba.partitioning
SET end_time = ?
WHERE storage_id = ?""")
config.session.execute(prepare, [time.time(), uid])
except:
pass
break


class TaskContext(object):
def __init__(self, logger, values, **kwargs):
self.logger = logger
self.values = values

def __enter__(self):
# Do something prolog
start_task(self.values)
# Ready to start the task
self.logger.info("Prolog finished")
pass

def __exit__(self, type, value, traceback):
# Do something epilog
end_task(self.values)
# Finished
self.logger.info("Epilog finished")
pass


def getByID(objid):
"""
We rebuild the object from its id. The id can either be:
block: UUID (eg. f291f008-a520-11e6-b42e-5b582e04fd70)
storageobj: UUID_(version) (eg. f291f008-a520-11e6-b42e-5b582e04fd70_1)
Args:
objid (str): object identifier
Returns:
(Block| Storageobj)
"""
"""
TODO
Args:
objid (str): object identifier
Returns:
(Block| Storageobj)
"""
from hecuba import log
from hecuba.IStorage import IStorage

try:
from hecuba import config
query = "SELECT * FROM hecuba.istorage WHERE storage_id = %s"
results = config.session.execute(query, [uuid.UUID(objid)])[0]
except Exception as e:
log.error("Query %s failed", query)
raise e

log.debug("IStorage API:getByID(%s) of class %s", objid, results.class_name)
return IStorage.build_remotely(results._asdict())
File renamed without changes.
2 changes: 1 addition & 1 deletion partitioner2.py → partitioners/partitioner2.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _setup_dynamic_structures(self):
print("Could not create table hecuba.partitioning.")
raise ex

self._prepared_store_id =\
self._prepared_store_id = \
config.session.prepare("""INSERT INTO hecuba.partitioning
(table_name, storage_id, number_of_partitions)
VALUES (?, ?, ?)""")
Expand Down
2 changes: 1 addition & 1 deletion partitioner3.py → partitioners/partitioner3.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _setup_dynamic_structures(self):
print("Could not create table hecuba.partitioning.")
raise ex

self._prepared_store_id =\
self._prepared_store_id = \
config.session.prepare("""INSERT INTO hecuba.partitioning
(partitioning_uuid, storage_id, number_of_partitions)
VALUES (?, ?, ?)""")
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion partitioner_tests.py → partitioners/partitioner_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def test_dynamic_best_idle_nodes(self):
query = config.session.prepare("""UPDATE hecuba.partitioning
SET end_time = ?
WHERE storage_id = ?""")
config.session.execute(query, [time.time()+150, id_partition0])
config.session.execute(query, [time.time() + 150, id_partition0])
else:
query = config.session.prepare("""UPDATE hecuba.partitioning
SET start_time = ?, end_time = ?
Expand Down

0 comments on commit 86a205e

Please sign in to comment.