diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7fd640c49cc9f1..63dc8a0ea9bfe5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1094,6 +1094,12 @@ private void setOperatorConfig( ? 0 // single input operator : inEdge.getTypeNumber() - 1; // in case of 2 or more inputs + Preconditions.checkState( + inputIndex < inputSerializers.length, + "Input type serializer of vertex '%s' was null or undefined for inputIndex %s", + vertex, + inputIndex); + if (chainedSource != null) { // chained source is the input if (inputConfigs[inputIndex] != null) {