Skip to content

Commit

Permalink
changes partially facilitating multi k #2
Browse files Browse the repository at this point in the history
also relevant to kmerloc
  • Loading branch information
rsuchecki committed Feb 23, 2020
1 parent c393b85 commit df5ac96
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 14 deletions.
18 changes: 12 additions & 6 deletions src/kmermatch/KmerMatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -145,12 +147,13 @@ private void match(ArrayList<String> inputFilenamesList, OptSet optSet) {
Reporter.report("[INFO]", "Start populating k-mers set", TOOL_NAME);

//SPAWN MAP - POPULATOR THREADS
ConcurrentSkipListSet<Kmer> kmers = new ConcurrentSkipListSet();
// ConcurrentSkipListSet<Kmer> kmers = new ConcurrentSkipListSet();
KmerSetsMap kmerSetsMap = new KmerSetsMap(TOOL_NAME);
final ExecutorService populatorExecutorService = new ThreadPoolExecutor(MATCHER_THREADS, MATCHER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ArrayList<Future<?>> populatorFutures = new ArrayList<>(MATCHER_THREADS);
ArrayList<Message> finalMessages = new ArrayList<>(MATCHER_THREADS * 5);
for (int i = 0; i < MATCHER_THREADS; i++) {
populatorFutures.add(populatorExecutorService.submit(new KmerSetPopulatorConsumer(kmers, inputKmersQueue, k, storeASCII)));
populatorFutures.add(populatorExecutorService.submit(new KmerSetPopulatorConsumer(kmerSetsMap, inputKmersQueue, k, storeASCII)));
}
populatorExecutorService.shutdown();
inputKmersExecutorService.shutdown();
Expand All @@ -172,8 +175,11 @@ private void match(ArrayList<String> inputFilenamesList, OptSet optSet) {
} catch (TimeoutException ex) {
Reporter.report("[ERROR]", "timeout exception!", getClass().getSimpleName());
}

Reporter.report("[INFO]", "Finished populating k-mers set, n=" + NumberFormat.getNumberInstance().format(kmers.size()), TOOL_NAME);
ConcurrentHashMap<Integer, ConcurrentSkipListSet<Kmer>> map = kmerSetsMap.getKmerSetsMap();
for (Map.Entry<Integer, ConcurrentSkipListSet<Kmer>> entry : map.entrySet()) {
Reporter.report("[INFO]", "Finished populating k-mers set, k= "+entry.getKey()+", n=" + NumberFormat.getNumberInstance().format(entry.getValue().size()), TOOL_NAME);
}
// Reporter.report("[INFO]", "Finished populating k-mers set, n=" + NumberFormat.getNumberInstance().format(kmers.size()), TOOL_NAME);

// for(String inputFileName: inputFilenamesList) {
//
Expand Down Expand Up @@ -212,7 +218,7 @@ private void match(ArrayList<String> inputFilenamesList, OptSet optSet) {
final ExecutorService matcherExecutorService = new ThreadPoolExecutor(MATCHER_THREADS, MATCHER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ArrayList<Future<?>> matcherFutures = new ArrayList<>(MATCHER_THREADS);
for (int i = 0; i < MATCHER_THREADS; i++) {
matcherFutures.add(matcherExecutorService.submit(new KmerMatcherConsumerProducer(inputQueue, outputQueue, kmers, IN_BUFFER_SIZE, TOOL_NAME,
matcherFutures.add(matcherExecutorService.submit(new KmerMatcherConsumerProducer(inputQueue, outputQueue, kmerSetsMap.getKmerSet(k), IN_BUFFER_SIZE, TOOL_NAME,
finalMessages, optSet.getOpt("v").isUsed(), (int) optSet.getOpt("m").getValueOrDefault(),
(double) optSet.getOpt("M").getValueOrDefault(),
inputReaderProducer.getGuessedInputFormat(), k, storeASCII)));
Expand Down Expand Up @@ -246,7 +252,7 @@ private void match(ArrayList<String> inputFilenamesList, OptSet optSet) {


if(optSet.getOpt("d").isUsed()) {
Iterator<Kmer> iterator = kmers.iterator();
Iterator<Kmer> iterator = kmerSetsMap.getKmerSet(k).iterator();
while (iterator.hasNext()) {
Kmer next = iterator.next();
if(storeASCII) {
Expand Down
22 changes: 14 additions & 8 deletions src/kmermatch/KmerSetPopulatorConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.regex.Pattern;
import shared.SequenceOps;
Expand All @@ -28,19 +30,21 @@
*/
public class KmerSetPopulatorConsumer implements Runnable {

private final ConcurrentSkipListSet<Kmer> kmers;
// private final ConcurrentHashMap<Integer, ConcurrentSkipListSet<Kmer>> kmerSetsMap;
private final KmerSetsMap kmerSetsMap;
// private final ConcurrentSkipListSet<Kmer> kmers;
private final BlockingQueue<ArrayList<String>> inputQueue;
private final Integer k;
private final boolean storeASCII;

/**
*
* @param kmers
* @param kmerSetsMap
* @param inputQueue
* @param k - set to null if no need to kmerize input
*/
public KmerSetPopulatorConsumer(ConcurrentSkipListSet<Kmer> kmers, BlockingQueue<ArrayList<String>> inputQueue, Integer k, boolean storeASCII) {
this.kmers = kmers;
public KmerSetPopulatorConsumer(KmerSetsMap kmerSetsMap, BlockingQueue<ArrayList<String>> inputQueue, Integer k, boolean storeASCII) {
this.kmerSetsMap = kmerSetsMap;
this.inputQueue = inputQueue;
this.k = k;
this.storeASCII = storeASCII;
Expand All @@ -53,10 +57,13 @@ public void run() {
Pattern nonNuclPattern = Pattern.compile(".*[^acgtACGT]+.*");
Pattern tab = Pattern.compile("\t");
List<String> list;
if (k ==null) { //KMERS INPUT, NO NEED TO KMERIZE
if (k == null) { //KMERS INPUT, NO NEED TO KMERIZE
StringTokenizer tokenizer;
while (!(list = inputQueue.take()).isEmpty()) {
for (String line : list) {
kmers.add(new Kmer(SequenceOps.getCanonical(tab.split(line)[0]), storeASCII));
tokenizer = new StringTokenizer(line);
String tok = tokenizer.nextToken().toUpperCase();
kmerSetsMap.getKmerSet(tok.length()).add(new Kmer(SequenceOps.getCanonical(tab.split(line)[0]), storeASCII));
}
}
} else { //KMERIZE
Expand All @@ -67,7 +74,7 @@ public void run() {
for (int i = 0; i < maxKmer; i++) {
String canonical = SequenceOps.getCanonical(kmer.subSequence(i, i + k).toString());
if(!nonNuclPattern.matcher(canonical).matches()) {
kmers.add(new Kmer(canonical, storeASCII));
kmerSetsMap.getKmerSet(canonical.length()).add(new Kmer(canonical, storeASCII));
}
}
}
Expand All @@ -78,5 +85,4 @@ public void run() {
e.printStackTrace();
}
}

}
65 changes: 65 additions & 0 deletions src/kmermatch/KmerSetsMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020 rad.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kmermatch;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import kmermatch.Kmer;
import shared.Reporter;

/**
*
* @author rad
*/
public class KmerSetsMap {

private final ConcurrentHashMap<Integer, ConcurrentSkipListSet<Kmer>> kmersSetsMap;
private final ArrayList<Integer> kSizes;
private final String TOOL_NAME;

public KmerSetsMap(String TOOL_NAME) {
this.kmersSetsMap = new ConcurrentHashMap<>();
this.kSizes = new ArrayList<>();
this.TOOL_NAME = TOOL_NAME;
}

public ConcurrentHashMap<Integer, ConcurrentSkipListSet<Kmer>> getKmerSetsMap() {
return kmersSetsMap;
}

public ConcurrentSkipListSet<Kmer> getKmerSet(int k) {
ConcurrentSkipListSet<Kmer> set = kmersSetsMap.get(k);
if (set == null) {
set = new ConcurrentSkipListSet<>();
ConcurrentSkipListSet previous = kmersSetsMap.putIfAbsent(k, set);
if (previous == null) {
addKValue(k);
} else { //another thread just beat us to putting this one in
return previous;
}
}
return set;
}

private synchronized void addKValue(int k) {
if (!kSizes.contains(k)) { //might have been added by input reader,
Reporter.report("[INFO]", "Initiating set for previously unseen k-mers, k=" + k, TOOL_NAME);
kSizes.add(k);
}
}

}

0 comments on commit df5ac96

Please sign in to comment.