Skip to content

Commit

Permalink
Merge pull request #167 from saalfeldlab/newsolver_alignment_pipeline_v2
Browse files Browse the repository at this point in the history
newsolver alignment pipeline v2
  • Loading branch information
minnerbe authored Jan 8, 2024
2 parents 2fd3e5f + f145212 commit 3159a15
Show file tree
Hide file tree
Showing 14 changed files with 1,038 additions and 614 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
import org.janelia.alignment.spec.stack.StackWithZValues;
import org.janelia.render.client.ClientRunner;
import org.janelia.render.client.RenderDataClient;
import org.janelia.render.client.parameter.AlignmentPipelineParameters;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineParameters;
import org.janelia.render.client.parameter.CommandLineParameters;
import org.janelia.render.client.parameter.MipmapParameters;
import org.janelia.render.client.parameter.MultiProjectParameters;
import org.janelia.render.client.parameter.RenderWebServiceParameters;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +31,7 @@
* @author Eric Trautman
*/
public class MipmapClient
implements Serializable {
implements Serializable, AlignmentPipelineStep {

public static class Parameters extends CommandLineParameters {

Expand All @@ -39,67 +40,77 @@ public static class Parameters extends CommandLineParameters {

@ParametersDelegate
MipmapParameters mipmap = new MipmapParameters();

/** @return client specific parameters populated from specified alignment pipeline parameters. */
public static Parameters fromPipeline(final AlignmentPipelineParameters alignmentPipelineParameters) {
final MultiProjectParameters multiProject = alignmentPipelineParameters.getMultiProject();
final Parameters derivedParameters = new Parameters();
derivedParameters.renderWeb = new RenderWebServiceParameters(multiProject.baseDataUrl,
multiProject.owner,
multiProject.project);
derivedParameters.mipmap = alignmentPipelineParameters.getMipmap();
derivedParameters.mipmap.setStackIdWithZIfUndefined(multiProject.stackIdWithZ);
return derivedParameters;
}
}

/** Run the client with command line parameters. */
public static void main(final String[] args) {
final ClientRunner clientRunner = new ClientRunner(args) {
@Override
public void runClient(final String[] args) throws Exception {

final Parameters parameters = new Parameters();
parameters.parse(args);

final MipmapClient client = new MipmapClient(parameters);
client.run();
final MipmapClient client = new MipmapClient();
client.createContextAndRun(parameters);
}
};
clientRunner.run();
}

private final Parameters parameters;

public MipmapClient(final Parameters parameters) {
LOG.info("init: parameters={}", parameters);
this.parameters = parameters;
/** Empty constructor required for alignment pipeline steps. */
public MipmapClient() {
}

public void run() throws IOException {
final SparkConf conf = new SparkConf().setAppName("MipmapClient");
/** Create a spark context and run the client with the specified parameters. */
public void createContextAndRun(final Parameters mipmapParameters) throws IOException {
final SparkConf conf = new SparkConf().setAppName(getClass().getSimpleName());
try (final JavaSparkContext sparkContext = new JavaSparkContext(conf)) {
final String sparkAppId = sparkContext.getConf().getAppId();
LOG.info("run: appId is {}", sparkAppId);
runWithContext(sparkContext);
LOG.info("createContextAndRun: appId is {}", sparkContext.getConf().getAppId());
generateMipmaps(sparkContext, mipmapParameters);
}
}

public void runWithContext(final JavaSparkContext sparkContext)
/** Validates the specified pipeline parameters are sufficient. */
@Override
public void validatePipelineParameters(final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException {
AlignmentPipelineParameters.validateRequiredElementExists("mipmap",
pipelineParameters.getMipmap());
}

/** Run the client as part of an alignment pipeline. */
public void runPipelineStep(final JavaSparkContext sparkContext,
final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException, IOException {

final MultiProjectParameters multiProject = pipelineParameters.getMultiProject();
final Parameters clientParameters = new Parameters();
clientParameters.renderWeb = new RenderWebServiceParameters(multiProject.baseDataUrl,
multiProject.owner,
multiProject.project);
clientParameters.mipmap = pipelineParameters.getMipmap();
clientParameters.mipmap.setStackIdWithZIfUndefined(multiProject.stackIdWithZ);

generateMipmaps(sparkContext, clientParameters);
}

/** Run the client with the specified spark context and parameters. */
private void generateMipmaps(final JavaSparkContext sparkContext,
final Parameters clientParameters)
throws IOException {

LOG.info("runWithContext: entry");
LOG.info("generateMipmaps: entry, clientParameters={}", clientParameters);

final RenderDataClient sourceDataClient = parameters.renderWeb.getDataClient();
final RenderDataClient sourceDataClient = clientParameters.renderWeb.getDataClient();

final List<StackWithZValues> batchedList =
parameters.mipmap.stackIdWithZ.buildListOfStackWithBatchedZ(sourceDataClient);
clientParameters.mipmap.stackIdWithZ.buildListOfStackWithBatchedZ(sourceDataClient);
final JavaRDD<StackWithZValues> rddStackIdWithZValues = sparkContext.parallelize(batchedList);

final Function<StackWithZValues, Integer> mipmapFunction = stackIdWithZ -> {
LogUtilities.setupExecutorLog4j(stackIdWithZ.toString());
final org.janelia.render.client.MipmapClient mc =
new org.janelia.render.client.MipmapClient(parameters.renderWeb,
parameters.mipmap);
new org.janelia.render.client.MipmapClient(clientParameters.renderWeb,
clientParameters.mipmap);
return mc.processMipmapsForZ(stackIdWithZ.getStackId(),
stackIdWithZ.getFirstZ());
};
Expand All @@ -112,12 +123,12 @@ public void runWithContext(final JavaSparkContext sparkContext)
total += tileCount;
}

LOG.info("run: collected stats");
LOG.info("run: generated mipmaps for {} tiles", total);
LOG.info("generateMipmaps: collected stats");
LOG.info("generateMipmaps: generated mipmaps for {} tiles", total);

final org.janelia.render.client.MipmapClient mc =
new org.janelia.render.client.MipmapClient(parameters.renderWeb,
parameters.mipmap);
new org.janelia.render.client.MipmapClient(clientParameters.renderWeb,
clientParameters.mipmap);
final List<StackId> distinctStackIds = batchedList.stream()
.map(StackWithZValues::getStackId)
.distinct()
Expand All @@ -126,7 +137,7 @@ public void runWithContext(final JavaSparkContext sparkContext)
mc.updateMipmapPathBuilderForStack(stackId);
}

LOG.info("runWithContext: exit");
LOG.info("generateMipmaps: exit");
}

private static final Logger LOG = LoggerFactory.getLogger(MipmapClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
Expand All @@ -16,11 +17,12 @@
import org.janelia.alignment.spec.stack.StackWithZValues;
import org.janelia.render.client.ClientRunner;
import org.janelia.render.client.RenderDataClient;
import org.janelia.render.client.parameter.AlignmentPipelineParameters;
import org.janelia.render.client.parameter.CommandLineParameters;
import org.janelia.render.client.parameter.MultiProjectParameters;
import org.janelia.render.client.parameter.TileClusterParameters;
import org.janelia.render.client.spark.LogUtilities;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineParameters;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +32,7 @@
* @author Eric Trautman
*/
public class ClusterCountClient
implements Serializable {
implements Serializable, AlignmentPipelineStep {

public static class Parameters extends CommandLineParameters {
@ParametersDelegate
Expand All @@ -46,63 +48,80 @@ public org.janelia.render.client.ClusterCountClient buildJavaClient() {
javaClientParameters.tileCluster = tileCluster;
return new org.janelia.render.client.ClusterCountClient(javaClientParameters);
}

/** @return client specific parameters populated from specified alignment pipeline parameters. */
public static Parameters fromPipeline(final AlignmentPipelineParameters alignmentPipelineParameters) {
final Parameters derivedParameters = new Parameters();
derivedParameters.multiProject = alignmentPipelineParameters.getMultiProject();
derivedParameters.tileCluster = alignmentPipelineParameters.getTileCluster();
return derivedParameters;
}

}

/** Run the client with command line parameters. */
public static void main(final String[] args) {

final ClientRunner clientRunner = new ClientRunner(args) {
@Override
public void runClient(final String[] args) throws Exception {

final Parameters parameters = new Parameters();
parameters.parse(args);

final ClusterCountClient client = new ClusterCountClient(parameters);
client.run();

final ClusterCountClient client = new ClusterCountClient();
client.createContextAndRun(parameters);
}
};
clientRunner.run();
}

/** Empty constructor required for alignment pipeline steps. */
public ClusterCountClient() {
}

private final Parameters parameters;
/** Create a spark context and run the client with the specified parameters. */
public void createContextAndRun(final Parameters clientParameters) throws IOException {
final SparkConf conf = new SparkConf().setAppName(getClass().getSimpleName());
try (final JavaSparkContext sparkContext = new JavaSparkContext(conf)) {
LOG.info("run: appId is {}", sparkContext.getConf().getAppId());
findConnectedClusters(sparkContext, clientParameters);
}
}

public ClusterCountClient(final Parameters parameters) throws IllegalArgumentException {
LOG.info("init: parameters={}", parameters);
this.parameters = parameters;
/** Validates the specified pipeline parameters are sufficient. */
@Override
public void validatePipelineParameters(final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException {
AlignmentPipelineParameters.validateRequiredElementExists("tileCluster",
pipelineParameters.getTileCluster());
}

public void run() throws IOException {
/** Run the client as part of an alignment pipeline. */
public void runPipelineStep(final JavaSparkContext sparkContext,
final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException, IOException {

final SparkConf conf = new SparkConf().setAppName("ClusterCountClient");
final Parameters clientParameters = new Parameters();
clientParameters.multiProject = pipelineParameters.getMultiProject();
clientParameters.tileCluster = pipelineParameters.getTileCluster();

try (final JavaSparkContext sparkContext = new JavaSparkContext(conf)) {
final String sparkAppId = sparkContext.getConf().getAppId();
LOG.info("run: appId is {}", sparkAppId);
findConnectedClusters(sparkContext);
final List<ConnectedTileClusterSummaryForStack> summaryList =
findConnectedClusters(sparkContext, clientParameters);

final List<String> problemStackSummaryStrings = new ArrayList<>();
for (final ConnectedTileClusterSummaryForStack stackSummary : summaryList) {
if (stackSummary.hasMultipleClusters() ||
stackSummary.hasUnconnectedTiles() ||
stackSummary.hasTooManyConsecutiveUnconnectedEdges()) {
problemStackSummaryStrings.add(stackSummary.toString());
}
}

if (! problemStackSummaryStrings.isEmpty()) {
throw new IOException("The following " + problemStackSummaryStrings.size() +
" stacks have match connection issues:\n" +
String.join("\n", problemStackSummaryStrings));
}
}

public List<ConnectedTileClusterSummaryForStack> findConnectedClusters(final JavaSparkContext sparkContext)
private List<ConnectedTileClusterSummaryForStack> findConnectedClusters(final JavaSparkContext sparkContext,
final Parameters clientParameters)
throws IOException {

LOG.info("findConnectedClusters: entry");

final RenderDataClient renderDataClient = parameters.multiProject.getDataClient();
final String baseDataUrl = renderDataClient.getBaseDataUrl();
LOG.info("findConnectedClusters: entry, clientParameters={}", clientParameters);

final List<StackWithZValues> stackWithZValuesList =
parameters.multiProject.stackIdWithZ.buildListOfStackWithAllZ(renderDataClient);
final MultiProjectParameters multiProjectParameters = clientParameters.multiProject;
final String baseDataUrl = multiProjectParameters.getBaseDataUrl();
final List<StackWithZValues> stackWithZValuesList = multiProjectParameters.buildListOfStackWithAllZ();

final JavaRDD<StackWithZValues> rddStackWithZValues = sparkContext.parallelize(stackWithZValuesList);

Expand All @@ -111,16 +130,16 @@ public List<ConnectedTileClusterSummaryForStack> findConnectedClusters(final Jav
LogUtilities.setupExecutorLog4j(stackWithZ.toString());

final StackId stackId = stackWithZ.getStackId();
final MatchCollectionId matchCollectionId = parameters.multiProject.getMatchCollectionIdForStack(stackId);
final MatchCollectionId matchCollectionId = multiProjectParameters.getMatchCollectionIdForStack(stackId);
final RenderDataClient localDataClient = new RenderDataClient(baseDataUrl,
stackId.getOwner(),
stackId.getProject());
final org.janelia.render.client.ClusterCountClient jClient = parameters.buildJavaClient();
final org.janelia.render.client.ClusterCountClient jClient = clientParameters.buildJavaClient();

return jClient.findConnectedClustersForStack(stackWithZ,
matchCollectionId,
localDataClient,
parameters.tileCluster);
clientParameters.tileCluster);
};

final JavaRDD<ConnectedTileClusterSummaryForStack> rddSummaries = rddStackWithZValues.map(summarizeFunction);
Expand Down
Loading

0 comments on commit 3159a15

Please sign in to comment.