diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java index 930b62b2d..74ce13130 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java @@ -51,4 +51,9 @@ public void compute(Vertex vertex, Iterable messages) throws IOException { public void postSuperstep() { workerLogic.postSuperstep(); } + + @Override + public boolean isVertexNoOp() { + return workerLogic.isVertexNoOp(); + } } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java index d4f6c3f2d..1f96afb8d 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java @@ -19,9 +19,8 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexReceiver; import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender; -import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; -import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; import org.apache.giraph.graph.Vertex; /** @@ -31,7 +30,7 @@ public class BlockWorkerLogic { private final BlockWorkerPieces pieces; - private transient VertexReceiver receiveFunctions; + private transient InnerVertexReceiver receiveFunctions; private transient InnerVertexSender sendFunctions; public BlockWorkerLogic(BlockWorkerPieces pieces) { @@ -60,11 +59,22 @@ public void compute(Vertex vertex, Iterable messages) { } public void postSuperstep() { - if (receiveFunctions instanceof VertexPostprocessor) { - ((VertexPostprocessor) receiveFunctions).postprocess(); + if (receiveFunctions != null) { + receiveFunctions.postprocess(); } if (sendFunctions != null) { sendFunctions.postprocess(); } } + + /** + * Return true iff compute() function is empty + * + * @return True iff compute function is empty, so we know we don't have to + * iterate through vertices + */ + public boolean isVertexNoOp() { + return (receiveFunctions == null || receiveFunctions.isVertexNoOp()) && + (sendFunctions == null || sendFunctions.isVertexNoOp()); + } } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java index 90fe83e32..532cbd564 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java @@ -25,8 +25,8 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexReceiver; import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender; -import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; import org.apache.hadoop.io.Writable; /** @@ -73,10 +73,10 @@ public void masterCompute(BlockMasterApi masterApi) { } } - public VertexReceiver getVertexReceiver( + public InnerVertexReceiver getVertexReceiver( BlockWorkerReceiveApi receiveApi) { if (piece != null) { - return piece.getVertexReceiver(receiveApi, executionStage); + return piece.getWrappedVertexReceiver(receiveApi, executionStage); } return null; } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java index 17a19705a..930202e39 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java @@ -146,13 +146,22 @@ public abstract void wrappedRegisterReducers( // getVertexSender(BlockWorkerSendApi workerApi, S executionStage) /** - * Add automatic handling of reducers to getVertexSender. + * Add automatic handling of reducers to getVertexSender and vertex no-op + * check. * * Only for Framework internal use. */ public abstract InnerVertexSender getWrappedVertexSender( final BlockWorkerSendApi workerApi, S executionStage); + /** + * Add vertex no-op check. + * + * Only for Framework internal use. + */ + public abstract InnerVertexReceiver getWrappedVertexReceiver( + final BlockWorkerReceiveApi workerApi, S executionStage); + /** * Override to have worker context send computation. * @@ -186,25 +195,6 @@ public void workerContextReceive( WV workerValue, List workerMessages) { } - /** - * Override to do vertex receive processing. - * - * Creates handler that defines what should be executed on worker - * for each vertex during receive phase. - * - * This logic executed last. - * This function is called once on each worker on each thread, in parallel, - * on their copy of Piece object to create functions handler. - * - * If returned object implements Postprocessor interface, then corresponding - * postprocess() function is going to be called once, after all vertices - * corresponding thread needed to process are done. - */ - public VertexReceiver getVertexReceiver( - BlockWorkerReceiveApi workerApi, S executionStage) { - return null; - } - /** * Returns MessageClasses definition for messages being sent by this Piece. */ @@ -242,6 +232,16 @@ public abstract class InnerVertexSender implements VertexSender, VertexPostprocessor { @Override public void postprocess() { } + + /** + * Return true iff vertexSend function is empty + * + * @return True iff vertexSend function is empty, + * so we know we don't have to iterate through vertices + */ + public boolean isVertexNoOp() { + return false; + } } /** Inner class to provide clean use without specifying types */ @@ -249,6 +249,16 @@ public abstract class InnerVertexReceiver implements VertexReceiver, VertexPostprocessor { @Override public void postprocess() { } + + /** + * Return true iff vertexReceive function is empty + * + * @return True iff vertexReceive function is empty, + * so we know we don't have to iterate through vertices + */ + public boolean isVertexNoOp() { + return false; + } } // Internal implementation diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java index d96b4362c..bb9bb52fa 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java @@ -18,6 +18,7 @@ package org.apache.giraph.block_app.framework.piece; import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; import org.apache.giraph.block_app.framework.api.CreateReducersApi; import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject; @@ -25,6 +26,7 @@ import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper; import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler; import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses; import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf; @@ -135,6 +137,25 @@ public VertexSender getVertexSender( return null; } + /** + * Override to do vertex receive processing. + * + * Creates handler that defines what should be executed on worker + * for each vertex during receive phase. + * + * This logic executed last. + * This function is called once on each worker on each thread, in parallel, + * on their copy of Piece object to create functions handler. + * + * If returned object implements Postprocessor interface, then corresponding + * postprocess() function is going to be called once, after all vertices + * corresponding thread needed to process are done. + */ + public VertexReceiver getVertexReceiver( + BlockWorkerReceiveApi workerApi, S executionStage) { + return null; + } + /** * Override to specify type of the message this Piece sends, if it does * send messages. @@ -284,6 +305,12 @@ public void vertexSend(Vertex vertex) { functions.vertexSend(vertex); } } + + @Override + public boolean isVertexNoOp() { + return functions == null; + } + @Override public void postprocess() { if (functions instanceof VertexPostprocessor) { @@ -294,6 +321,33 @@ public void postprocess() { }; } + @Override + public final InnerVertexReceiver getWrappedVertexReceiver( + final BlockWorkerReceiveApi workerApi, S executionStage) { + final VertexReceiver functions = + getVertexReceiver(workerApi, executionStage); + return new InnerVertexReceiver() { + @Override + public void vertexReceive(Vertex vertex, Iterable messages) { + if (functions != null) { + functions.vertexReceive(vertex, messages); + } + } + + @Override + public boolean isVertexNoOp() { + return functions == null; + } + + @Override + public void postprocess() { + if (functions instanceof VertexPostprocessor) { + ((VertexPostprocessor) functions).postprocess(); + } + } + }; + } + @Override public final void wrappedRegisterReducers( BlockMasterApi masterApi, S executionStage) { diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java index ef278181a..8ad5f237d 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java @@ -96,7 +96,7 @@ protected DelegateWorkerSendFunctions delegateWorkerSendFunctions( } protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions( - ArrayList> workerReceiveFunctions, + ArrayList workerReceiveFunctions, BlockWorkerReceiveApi workerApi, S executionStage) { return new DelegateWorkerReceiveFunctions(workerReceiveFunctions); } @@ -115,13 +115,13 @@ public InnerVertexSender getWrappedVertexSender( } @Override - public InnerVertexReceiver getVertexReceiver( + public InnerVertexReceiver getWrappedVertexReceiver( BlockWorkerReceiveApi workerApi, S executionStage) { - ArrayList> workerReceiveFunctions = + ArrayList workerReceiveFunctions = new ArrayList<>(innerPieces.size()); for (AbstractPiece innerPiece : innerPieces) { workerReceiveFunctions.add( - innerPiece.getVertexReceiver(workerApi, executionStage)); + innerPiece.getWrappedVertexReceiver(workerApi, executionStage)); } return delegateWorkerReceiveFunctions( workerReceiveFunctions, workerApi, executionStage); @@ -145,6 +145,16 @@ public void vertexSend(Vertex vertex) { } } + @Override + public boolean isVertexNoOp() { + for (InnerVertexSender functions : workerSendFunctions) { + if (functions != null && !functions.isVertexNoOp()) { + return false; + } + } + return true; + } + @Override public void postprocess() { for (InnerVertexSender functions : workerSendFunctions) { @@ -157,11 +167,11 @@ public void postprocess() { /** Delegating WorkerReceivePiece */ protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver { - private final ArrayList> + private final ArrayList workerReceiveFunctions; public DelegateWorkerReceiveFunctions( - ArrayList> workerReceiveFunctions) { + ArrayList workerReceiveFunctions) { this.workerReceiveFunctions = workerReceiveFunctions; } @@ -175,6 +185,16 @@ public void vertexReceive(Vertex vertex, Iterable messages) { } } + @Override + public boolean isVertexNoOp() { + for (InnerVertexReceiver functions : workerReceiveFunctions) { + if (functions != null && !functions.isVertexNoOp()) { + return false; + } + } + return true; + } + @Override public void postprocess() { for (VertexReceiver functions : diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java index 5c702c5ad..56d944ceb 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java @@ -22,7 +22,6 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; import org.apache.giraph.block_app.framework.piece.AbstractPiece; -import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; import org.apache.giraph.function.vertex.SupplierFromVertex; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; @@ -122,12 +121,17 @@ public void vertexSend(Vertex vertex) { super.vertexSend(vertex); } } + + @Override + public boolean isVertexNoOp() { + return toCallSend == null; + } }; } @Override protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions( - ArrayList> workerReceiveFunctions, + ArrayList workerReceiveFunctions, BlockWorkerReceiveApi workerApi, S executionStage) { return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) { @Override @@ -136,6 +140,11 @@ public void vertexReceive(Vertex vertex, Iterable messages) { super.vertexReceive(vertex, messages); } } + + @Override + public boolean isVertexNoOp() { + return toCallReceive == null; + } }; } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java index 587ae65f6..63b79b125 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java @@ -58,6 +58,16 @@ public class Pieces { private Pieces() { } + /** + * Piece which does nothing + */ + public static Piece noOpPiece() { + return new Piece() { + }; + } + /** * For each vertex execute given process function. * Computation is happening in send phase of the returned Piece. diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java index a4b61b2eb..a5be96b91 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java @@ -294,4 +294,9 @@ public final int getWorkerForVertex(I vertexId) { return allWorkersInfo.getWorkerIndex( serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo()); } + + @Override + public boolean isVertexNoOp() { + return false; + } } diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java index 1ac6f4301..e00037aa9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java @@ -205,4 +205,13 @@ void removeEdgesRequest(I sourceVertexId, I targetVertexId) */ @SuppressWarnings("unchecked") W getWorkerContext(); + + /** + * Check if this Computation doesn't do anything inside of compute() function. + * It will be used to skip iterating through vertices, so if it returns true + * compute() won't be called at all. + * + * @return True iff compute function is empty. + */ + boolean isVertexNoOp(); } diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index cdd987740..9aae675fe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -278,6 +278,15 @@ private PartitionStats computePartition( Partition partition, OutOfCoreEngine oocEngine, boolean ignoreExistingVertices) throws IOException, InterruptedException { + if (computation.isVertexNoOp()) { + if (LOG.isInfoEnabled()) { + LOG.info("Skipping compute since it's no-op"); + } + return new PartitionStats(partition.getId(), partition.getVertexCount(), + 0, partition.getEdgeCount(), 0, 0); + } + + PartitionStats partitionStats = new PartitionStats(partition.getId(), 0, 0, 0, 0, 0); final LongRef verticesComputedProgress = new LongRef(0);