diff --git a/CHANGELOG.rst b/CHANGELOG.rst index db2c3552..27ecb3aa 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,10 @@ +0.8.2 +----- + +Fix discrepancies between Python and C++ versions #102 +Utility added to ``anonlink/concurrency.py`` help with chunking. +Better Github status messages posted by jenkins. + 0.8.1 ----- diff --git a/Jenkinsfile.groovy b/Jenkinsfile.groovy index 416a2e1f..d7e49757 100644 --- a/Jenkinsfile.groovy +++ b/Jenkinsfile.groovy @@ -9,12 +9,12 @@ def isDevelop = env.BRANCH_NAME == 'develop' VENV_DIRECTORY = "env" -GIT_CONTEXT = "jenkins" +GITHUB_TEST_CONTEXT = "jenkins/test" +GITHUB_RELEASE_CONTEXT = "jenkins/release" def configs = [ - [label: 'GPU 1', pythons: ['python3.4', 'python3.5', 'python3.6'], compilers: ['clang', 'gcc']], - //[label: 'osx', pythons: ['python3.5'], compilers: ['clang', 'gcc']] - [label: 'McNode', pythons: ['python3.5'], compilers: ['clang']] + [os: 'linux', pythons: ['python3.4', 'python3.5', 'python3.6'], compilers: ['clang', 'gcc']], + [os: 'osx', pythons: ['python3.6', 'python3.7'], compilers: ['clang']] ] def PythonVirtualEnvironment prepareVirtualEnvironment(String pythonVersion, clkhashPackageName, compiler, venv_directory = VENV_DIRECTORY) { @@ -77,7 +77,7 @@ def build(python_version, compiler, label, release = false) { def builders = [:] for (config in configs) { - def label = config["label"] + def os = config["os"] def pythons = config["pythons"] def compilers = config["compilers"] @@ -86,7 +86,9 @@ for (config in configs) { def py_version = _py_version def compiler = _compiler - def combinedName = "${label}-${py_version}-${compiler}" + + def label = "$os&&$py_version&&$compiler" + def combinedName = "${os} ${compiler} ${py_version}" builders[combinedName] = { node(label) { @@ -102,14 +104,17 @@ for (config in configs) { GitCommit commit; node() { commit = GitUtils.checkoutFromSCM(this); - commit.setInProgressStatus(GIT_CONTEXT); + commit.setInProgressStatus(GITHUB_TEST_CONTEXT); } try { parallel builders + node() { + commit.setSuccessStatus(GITHUB_TEST_CONTEXT) + } } catch (Exception err) { node() { - commit.setFailStatus("Build failed", GIT_CONTEXT); + commit.setFailStatus("Build failed", GITHUB_TEST_CONTEXT); } throw err } @@ -117,11 +122,12 @@ try { node('GPU 1') { stage('Release') { try { + commit.setInProgressStatus(GITHUB_RELEASE_CONTEXT); build('python3.5', 'gcc', 'GPU 1', true) - commit.setSuccessStatus(GIT_CONTEXT) + commit.setSuccessStatus(GITHUB_RELEASE_CONTEXT) } catch (Exception e) { - commit.setFailStatus("Release failed", GIT_CONTEXT); + commit.setFailStatus("Release failed", GITHUB_RELEASE_CONTEXT); throw e; } } -} \ No newline at end of file +} diff --git a/_cffi_build/dice_one_against_many.cpp b/_cffi_build/dice_one_against_many.cpp index a0967916..4cadc4f4 100644 --- a/_cffi_build/dice_one_against_many.cpp +++ b/_cffi_build/dice_one_against_many.cpp @@ -262,7 +262,7 @@ class Node { struct score_cmp { bool operator()(const Node& a, const Node& b) const { - return a.score >= b.score; + return a.score > b.score || (a.score == b.score && a.index < b.index); } }; @@ -465,8 +465,18 @@ extern "C" node_queue top_k_scores(score_cmp(), std::move(vec)); uint32_t count_one = _popcount_array(comp1, keywords); - if (count_one == 0) - return 0; + if (count_one == 0) { + if (threshold > 0) { + return 0; + } + + for (uint32_t j = 0; j < k; ++j) { + scores[j] = 0.0; + indices[j] = j; + } + + return static_cast(k); + } uint32_t max_popcnt_delta = keybytes * CHAR_BIT; // = bits per key if(threshold > 0) { diff --git a/anonlink/__init__.py b/anonlink/__init__.py index ce35a08c..763233d9 100644 --- a/anonlink/__init__.py +++ b/anonlink/__init__.py @@ -1,6 +1,7 @@ import pkg_resources from anonlink import bloommatcher +from anonlink import concurrency from anonlink import entitymatch from anonlink import network_flow diff --git a/anonlink/concurrency.py b/anonlink/concurrency.py new file mode 100644 index 00000000..2317d002 --- /dev/null +++ b/anonlink/concurrency.py @@ -0,0 +1,86 @@ +import itertools as _itertools +import math as _math +import numbers as _numbers +import typing as _typing + +import mypy_extensions as _mypy_extensions + + +# Future: There may be better ways of chunking. Hamish suggests putting +# a better guarantee on the maximum size of a chunk. This may help with +# optimisation (e.g., set chunk size to be the size of a page, +# eliminating page faults). +# As the function currently makes no guarantees, any such changes would +# be backwards compatible. + + +ChunkInfo = _mypy_extensions.TypedDict( + 'ChunkInfo', + {'datasetIndices': _typing.List[int], + 'ranges': _typing.List[_typing.List[int]]}) + + +def _split_points(size: int, chunks: int) -> _typing.Iterator[int]: + chunk_size = size / chunks + for i in range(chunks): + yield round(i * chunk_size) + yield size + + +def _chunks_1d( + size: int, + chunks: int +) -> _typing.Iterable[_typing.List[int]]: + split_points = _split_points(size, chunks) + a = next(split_points) + for b in split_points: + yield [a, b] + a = b + + +def split_to_chunks( + chunk_size_aim: _numbers.Real, + *, + # Keyword-only for forwards compatibility: this argument may not be + # needed once we do blocking + dataset_sizes: _typing.Sequence[_numbers.Integral] +) -> _typing.Iterable[ChunkInfo]: + """Split datasets into chunks for parallel processing. + + Resulting chunks are dictionaries with two keys: "datasetIndices" + and "ranges". The value for "datasetIndices" is a length 2 list of + the two datasets that we are comparing in this chunk. The value for + "ranges" is a length 2 list of ranges within those datasets. A range + is a length 2 list [a, b] representing range(a, b). + + For example, {"datasetIndices": [2, 4], "ranges": [[3, 21], [18, 20]]} + means that this chunk compares (0-indexed) datasets 2 and 4. We are + looking at elements 3-20 (inclusive) of dataset 2 and elements 18 + and 19 of dataset 4. + + The chunks are always JSON serialisable. + + :param chunk_size_aim: Number of comparisons per chunk to aim for. + This is a hint only. No promises. + :param datset_sizes: The sizes of the datsets to compare, as a + sequence. + + :return: An iterable of chunks. + """ + + # int-like and float-like types such as np.int64 are welcome but are + # not JSON-serialisable. + chunk_size_aim_float = float(chunk_size_aim) + dataset_sizes_int = map(int, dataset_sizes) + for (i0, size0), (i1, size1) in _itertools.combinations( + enumerate(dataset_sizes_int), 2): + if not size0 and not size1: + continue + chunks0 = round(size0 / _math.sqrt(chunk_size_aim_float)) or 1 + chunk_size0 = size0 / chunks0 + # chunk_size0 is unlikely to be exactly sqrt(chunk_size_aim). + # Adjust goal chunk size for the second dataset. + chunks1 = round(size1 * chunk_size0 / chunk_size_aim_float) or 1 + for c0, c1 in _itertools.product( + _chunks_1d(size0, chunks0), _chunks_1d(size1, chunks1)): + yield {'datasetIndices': [i0, i1], 'ranges': [c0, c1]} diff --git a/anonlink/entitymatch.py b/anonlink/entitymatch.py index 0cb2f50a..05a707e2 100644 --- a/anonlink/entitymatch.py +++ b/anonlink/entitymatch.py @@ -29,7 +29,7 @@ def dicecoeff(x): coeffs = filter(lambda c: c[1] >= threshold, enumerate(map(dicecoeff, filters2))) - top_k = sorted(coeffs, key=itemgetter(1), reverse=True)[:k] + top_k = sorted(coeffs, key=lambda x: -x[1])[:k] result.extend([(i, coeff, j) for j, coeff in top_k]) return result diff --git a/requirements.txt b/requirements.txt index abcedd72..a9fa2d17 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ cffi>=1.7 pytest>=3.4 pytest-cov>=2.5 clkhash==0.10.1 +mypy-extensions==0.3.0 diff --git a/setup.py b/setup.py index 2e8a7f75..835fcf92 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="anonlink", - version='0.8.1', + version='0.8.2', description='Anonymous linkage using cryptographic hashes and bloom filters', url='https://github.com/n1analytics/anonlink', license='Apache', diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 00000000..d52e70f6 --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,50 @@ +import itertools + +import pytest + +from anonlink import concurrency + +DATASET_SIZES = (0, 1, 100) +DATASET_NUMS = (0, 1, 2, 3) +DATASETS = tuple(itertools.chain.from_iterable( + itertools.product(DATASET_SIZES, repeat=n) for n in DATASET_NUMS)) +CHUNK_SIZE_AIMS = (1, 10, 100) + + + +@pytest.mark.parametrize('datasets', DATASETS) +@pytest.mark.parametrize('chunk_size_aim', CHUNK_SIZE_AIMS) +def test_chunk_size(datasets, chunk_size_aim): + # Guarantee: chunk_size_aim / 4 < chunk_size < chunk_size_aim * 4. + # It may be possible to prove a better bound. + chunks = concurrency.split_to_chunks(chunk_size_aim, + dataset_sizes=datasets) + for chunk in chunks: + size = 1 + i0, i1 = chunk['datasetIndices'] + for a, b in chunk['ranges']: + assert a <= b + size *= b - a + assert (chunk_size_aim / 4 < size + or 4 * chunk_size_aim > datasets[i0] * datasets[i1]) + assert size < chunk_size_aim * 4 + + + +@pytest.mark.parametrize('datasets', DATASETS) +@pytest.mark.parametrize('chunk_size_aim', CHUNK_SIZE_AIMS) +def test_comparison_coverage(datasets, chunk_size_aim): + all_comparisons = set() + for (i0, s0), (i1, s1) in itertools.combinations(enumerate(datasets), 2): + for j0, j1 in itertools.product(range(s0), range(s1)): + all_comparisons.add((i0, i1, j0, j1)) + chunks = concurrency.split_to_chunks(chunk_size_aim, + dataset_sizes=datasets) + for chunk in chunks: + i0, i1 = chunk['datasetIndices'] + r0, r1 = chunk['ranges'] + for j0, j1 in itertools.product(range(*r0), range(*r1)): + # This will raise KeyError if we have duplicates + all_comparisons.remove((i0, i1, j0, j1)) + # Make sure we've touched everything (so our set is empty) + assert not all_comparisons