Skip to content

Commit

Permalink
MINOR: Streams integration tests should not call exit (apache#9067)
Browse files Browse the repository at this point in the history
- replace System.exit with Exit.exit in all relevant classes
- forbid use of System.exit in all relevant classes and add exceptions for others

Co-authored-by: John Roesler <[email protected]>
Co-authored-by: Matthias J. Sax <[email protected]>

Reviewers: Lucas Bradstreet <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
vvcephei authored Aug 5, 2020
1 parent bc883cd commit 26a217c
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 40 deletions.
8 changes: 8 additions & 0 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@
<property name="ignoreComments" value="true"/>
</module>

<module name="Regexp">
<property name="id" value="dontUseSystemExit"/>
<property name="format" value="System\.exit"/>
<property name="illegalPattern" value="true"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="'System.exit': Should not directly call System.exit, but Exit.exit instead."/>
</module>

<!-- code quality -->
<module name="MethodLength"/>
<module name="ParameterNumber">
Expand Down
19 changes: 15 additions & 4 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
files="(ApiMessageType).java|MessageDataGenerator.java"/>
<suppress checks="MethodLength"
files="MessageDataGenerator.java"/>
<suppress id="dontUseSystemExit"
files="MessageGenerator.java"/>

<!-- Clients -->
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
<suppress checks="ClassFanOutComplexity"
Expand Down Expand Up @@ -78,7 +82,7 @@
<suppress checks="NPathComplexity"
files="MessageTest.java"/>

<!-- clients tests -->
<!-- Clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>

Expand Down Expand Up @@ -167,7 +171,7 @@
<suppress checks="FinalLocalVariable"
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>

<!-- generated code -->
<!-- Generated code -->
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS)"
files="streams[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>

Expand Down Expand Up @@ -195,16 +199,19 @@
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>

<!-- Streams Test-Utils -->
<!-- Streams test-utils -->
<suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="TopologyTestDriver.java"/>

<!-- Streams examples -->
<suppress id="dontUseSystemExit"
files="PageViewTypedDemo.java|PipeDemo.java|TemperatureDemo.java|WordCountDemo.java|WordCountProcessorDemo.java|WordCountTransformerDemo.java"/>

<!-- Tools -->
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>

<suppress checks="CyclomaticComplexity"
files="(StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
Expand All @@ -219,6 +226,10 @@
files="ProduceBenchSpec.java"/>
<suppress checks="ParameterNumber"
files="SustainedConnectionSpec.java"/>
<suppress id="dontUseSystemExit"
files="VerifiableConsumer.java"/>
<suppress id="dontUseSystemExit"
files="VerifiableProducer.java"/>

<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -37,6 +36,7 @@
import org.apache.kafka.streams.kstream.TimeWindows;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.examples.wordcount;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -31,6 +30,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
Expand Down Expand Up @@ -80,6 +81,12 @@ SmokeTestDriver.VerificationResult result() {

@Test
public void shouldWorkWithRebalance() throws InterruptedException {
Exit.setExitProcedure((statusCode, message) -> {
throw new AssertionError("Test called exit(). code:" + statusCode + " message:" + message);
});
Exit.setHaltProcedure((statusCode, message) -> {
throw new AssertionError("Test called halt(). code:" + statusCode + " message:" + message);
});
int numClientsCreated = 0;
final ArrayList<SmokeTestClient> clients = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -49,7 +50,7 @@ public class BrokerCompatibilityTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, processingMode; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

System.out.println("StreamsTest instance started");
Expand All @@ -62,7 +63,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class StreamsBrokerDownResilienceTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

System.out.println("StreamsTest instance started");
Expand All @@ -62,7 +62,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
Expand All @@ -86,7 +86,7 @@ public static void main(final String[] args) throws IOException {
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));

System.exit(1);
Exit.exit(1);
}

final StreamsBuilder builder = new StreamsBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;

Expand All @@ -31,7 +32,7 @@ public class StreamsEosTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

final String propFileName = args[0];
Expand All @@ -43,15 +44,15 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

if ("process".equals(command) || "process-complex".equals(command)) {
if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) {

System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + StreamsConfig.EXACTLY_ONCE_BETA);
System.exit(1);
Exit.exit(1);
}
}

Expand All @@ -62,7 +63,7 @@ public static void main(final String[] args) throws IOException {
System.out.flush();

if (command == null || propFileName == null) {
System.exit(-1);
Exit.exit(-1);
}

switch (command) {
Expand All @@ -84,7 +85,7 @@ public static void main(final String[] args) throws IOException {
default:
System.out.println("unknown command: " + command);
System.out.flush();
System.exit(-1);
Exit.exit(-1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;

Expand All @@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

final String propFileName = args[0];
Expand All @@ -53,7 +54,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

if ("process".equals(command)) {
Expand All @@ -64,7 +65,7 @@ public static void main(final String[] args) throws IOException {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " +
StreamsConfig.EXACTLY_ONCE + ", or " + StreamsConfig.EXACTLY_ONCE_BETA);

System.exit(1);
Exit.exit(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
"propFile, additionalConfigs; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

System.out.println("StreamsTest instance started");
Expand All @@ -61,7 +61,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
Expand All @@ -75,7 +75,7 @@ public static void main(final String[] args) throws IOException {
if (additionalConfigs == null) {
System.err.println("additional configs are not provided");
System.err.flush();
System.exit(1);
Exit.exit(1);
}

final Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
Expand All @@ -92,7 +92,7 @@ public static void main(final String[] args) throws IOException {
sinkTopic1,
sinkTopic2));
System.err.flush();
System.exit(1);
Exit.exit(1);
}

streamsProperties.putAll(updated);
Expand All @@ -104,7 +104,7 @@ public static void main(final String[] args) throws IOException {
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));

System.exit(1);
Exit.exit(1);
}

final StreamsBuilder builder = new StreamsBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;

Expand All @@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

final String propFileName = args[0];
Expand All @@ -53,7 +54,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

if ("process".equals(command)) {
Expand All @@ -63,7 +64,7 @@ public static void main(final String[] args) throws IOException {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);

System.exit(1);
Exit.exit(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;

Expand All @@ -40,7 +41,7 @@ public class StreamsSmokeTest {
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
System.exit(1);
Exit.exit(1);
}

final String propFileName = args[0];
Expand All @@ -53,7 +54,7 @@ public static void main(final String[] args) throws IOException {

if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
Exit.exit(1);
}

if ("process".equals(command)) {
Expand All @@ -63,7 +64,7 @@ public static void main(final String[] args) throws IOException {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);

System.exit(1);
Exit.exit(1);
}
}

Expand Down
Loading

0 comments on commit 26a217c

Please sign in to comment.