From df5ac9660bed266e648897699b8a7027c140b19d Mon Sep 17 00:00:00 2001 From: Rad Suchecki Date: Mon, 24 Feb 2020 10:06:29 +1030 Subject: [PATCH] changes partially facilitating multi k #2 also relevant to kmerloc --- src/kmermatch/KmerMatch.java | 18 ++++-- src/kmermatch/KmerSetPopulatorConsumer.java | 22 ++++--- src/kmermatch/KmerSetsMap.java | 65 +++++++++++++++++++++ 3 files changed, 91 insertions(+), 14 deletions(-) create mode 100644 src/kmermatch/KmerSetsMap.java diff --git a/src/kmermatch/KmerMatch.java b/src/kmermatch/KmerMatch.java index 87be53f..0c95bfa 100755 --- a/src/kmermatch/KmerMatch.java +++ b/src/kmermatch/KmerMatch.java @@ -13,8 +13,10 @@ import java.text.NumberFormat; import java.util.ArrayList; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -145,12 +147,13 @@ private void match(ArrayList inputFilenamesList, OptSet optSet) { Reporter.report("[INFO]", "Start populating k-mers set", TOOL_NAME); //SPAWN MAP - POPULATOR THREADS - ConcurrentSkipListSet kmers = new ConcurrentSkipListSet(); +// ConcurrentSkipListSet kmers = new ConcurrentSkipListSet(); + KmerSetsMap kmerSetsMap = new KmerSetsMap(TOOL_NAME); final ExecutorService populatorExecutorService = new ThreadPoolExecutor(MATCHER_THREADS, MATCHER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); ArrayList> populatorFutures = new ArrayList<>(MATCHER_THREADS); ArrayList finalMessages = new ArrayList<>(MATCHER_THREADS * 5); for (int i = 0; i < MATCHER_THREADS; i++) { - populatorFutures.add(populatorExecutorService.submit(new KmerSetPopulatorConsumer(kmers, inputKmersQueue, k, storeASCII))); + populatorFutures.add(populatorExecutorService.submit(new KmerSetPopulatorConsumer(kmerSetsMap, inputKmersQueue, k, storeASCII))); } populatorExecutorService.shutdown(); inputKmersExecutorService.shutdown(); @@ -172,8 +175,11 @@ private void match(ArrayList inputFilenamesList, OptSet optSet) { } catch (TimeoutException ex) { Reporter.report("[ERROR]", "timeout exception!", getClass().getSimpleName()); } - - Reporter.report("[INFO]", "Finished populating k-mers set, n=" + NumberFormat.getNumberInstance().format(kmers.size()), TOOL_NAME); + ConcurrentHashMap> map = kmerSetsMap.getKmerSetsMap(); + for (Map.Entry> entry : map.entrySet()) { + Reporter.report("[INFO]", "Finished populating k-mers set, k= "+entry.getKey()+", n=" + NumberFormat.getNumberInstance().format(entry.getValue().size()), TOOL_NAME); + } +// Reporter.report("[INFO]", "Finished populating k-mers set, n=" + NumberFormat.getNumberInstance().format(kmers.size()), TOOL_NAME); // for(String inputFileName: inputFilenamesList) { // @@ -212,7 +218,7 @@ private void match(ArrayList inputFilenamesList, OptSet optSet) { final ExecutorService matcherExecutorService = new ThreadPoolExecutor(MATCHER_THREADS, MATCHER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); ArrayList> matcherFutures = new ArrayList<>(MATCHER_THREADS); for (int i = 0; i < MATCHER_THREADS; i++) { - matcherFutures.add(matcherExecutorService.submit(new KmerMatcherConsumerProducer(inputQueue, outputQueue, kmers, IN_BUFFER_SIZE, TOOL_NAME, + matcherFutures.add(matcherExecutorService.submit(new KmerMatcherConsumerProducer(inputQueue, outputQueue, kmerSetsMap.getKmerSet(k), IN_BUFFER_SIZE, TOOL_NAME, finalMessages, optSet.getOpt("v").isUsed(), (int) optSet.getOpt("m").getValueOrDefault(), (double) optSet.getOpt("M").getValueOrDefault(), inputReaderProducer.getGuessedInputFormat(), k, storeASCII))); @@ -246,7 +252,7 @@ private void match(ArrayList inputFilenamesList, OptSet optSet) { if(optSet.getOpt("d").isUsed()) { - Iterator iterator = kmers.iterator(); + Iterator iterator = kmerSetsMap.getKmerSet(k).iterator(); while (iterator.hasNext()) { Kmer next = iterator.next(); if(storeASCII) { diff --git a/src/kmermatch/KmerSetPopulatorConsumer.java b/src/kmermatch/KmerSetPopulatorConsumer.java index 067f270..69a6b5c 100755 --- a/src/kmermatch/KmerSetPopulatorConsumer.java +++ b/src/kmermatch/KmerSetPopulatorConsumer.java @@ -17,7 +17,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.StringTokenizer; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.regex.Pattern; import shared.SequenceOps; @@ -28,19 +30,21 @@ */ public class KmerSetPopulatorConsumer implements Runnable { - private final ConcurrentSkipListSet kmers; +// private final ConcurrentHashMap> kmerSetsMap; + private final KmerSetsMap kmerSetsMap; +// private final ConcurrentSkipListSet kmers; private final BlockingQueue> inputQueue; private final Integer k; private final boolean storeASCII; /** * - * @param kmers + * @param kmerSetsMap * @param inputQueue * @param k - set to null if no need to kmerize input */ - public KmerSetPopulatorConsumer(ConcurrentSkipListSet kmers, BlockingQueue> inputQueue, Integer k, boolean storeASCII) { - this.kmers = kmers; + public KmerSetPopulatorConsumer(KmerSetsMap kmerSetsMap, BlockingQueue> inputQueue, Integer k, boolean storeASCII) { + this.kmerSetsMap = kmerSetsMap; this.inputQueue = inputQueue; this.k = k; this.storeASCII = storeASCII; @@ -53,10 +57,13 @@ public void run() { Pattern nonNuclPattern = Pattern.compile(".*[^acgtACGT]+.*"); Pattern tab = Pattern.compile("\t"); List list; - if (k ==null) { //KMERS INPUT, NO NEED TO KMERIZE + if (k == null) { //KMERS INPUT, NO NEED TO KMERIZE + StringTokenizer tokenizer; while (!(list = inputQueue.take()).isEmpty()) { for (String line : list) { - kmers.add(new Kmer(SequenceOps.getCanonical(tab.split(line)[0]), storeASCII)); + tokenizer = new StringTokenizer(line); + String tok = tokenizer.nextToken().toUpperCase(); + kmerSetsMap.getKmerSet(tok.length()).add(new Kmer(SequenceOps.getCanonical(tab.split(line)[0]), storeASCII)); } } } else { //KMERIZE @@ -67,7 +74,7 @@ public void run() { for (int i = 0; i < maxKmer; i++) { String canonical = SequenceOps.getCanonical(kmer.subSequence(i, i + k).toString()); if(!nonNuclPattern.matcher(canonical).matches()) { - kmers.add(new Kmer(canonical, storeASCII)); + kmerSetsMap.getKmerSet(canonical.length()).add(new Kmer(canonical, storeASCII)); } } } @@ -78,5 +85,4 @@ public void run() { e.printStackTrace(); } } - } diff --git a/src/kmermatch/KmerSetsMap.java b/src/kmermatch/KmerSetsMap.java new file mode 100644 index 0000000..103983e --- /dev/null +++ b/src/kmermatch/KmerSetsMap.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 rad. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kmermatch; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import kmermatch.Kmer; +import shared.Reporter; + +/** + * + * @author rad + */ +public class KmerSetsMap { + + private final ConcurrentHashMap> kmersSetsMap; + private final ArrayList kSizes; + private final String TOOL_NAME; + + public KmerSetsMap(String TOOL_NAME) { + this.kmersSetsMap = new ConcurrentHashMap<>(); + this.kSizes = new ArrayList<>(); + this.TOOL_NAME = TOOL_NAME; + } + + public ConcurrentHashMap> getKmerSetsMap() { + return kmersSetsMap; + } + + public ConcurrentSkipListSet getKmerSet(int k) { + ConcurrentSkipListSet set = kmersSetsMap.get(k); + if (set == null) { + set = new ConcurrentSkipListSet<>(); + ConcurrentSkipListSet previous = kmersSetsMap.putIfAbsent(k, set); + if (previous == null) { + addKValue(k); + } else { //another thread just beat us to putting this one in + return previous; + } + } + return set; + } + + private synchronized void addKValue(int k) { + if (!kSizes.contains(k)) { //might have been added by input reader, + Reporter.report("[INFO]", "Initiating set for previously unseen k-mers, k=" + k, TOOL_NAME); + kSizes.add(k); + } + } + +}