diff --git a/pom.xml b/pom.xml index fb7102d26..cbf48646c 100644 --- a/pom.xml +++ b/pom.xml @@ -334,7 +334,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.3 + 2.0.4 net.snowflake diff --git a/pom_confluent.xml b/pom_confluent.xml index f9bf42210..b769dfcec 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -386,7 +386,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.3 + 2.0.4 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 50c6d6d1c..48510964d 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -117,9 +117,9 @@ public class SnowflakeSinkConnectorConfig { public static final String INGESTION_METHOD_DEFAULT_SNOWPIPE = IngestionMethodConfig.SNOWPIPE.toString(); - // This is the streaming bdec file version which can be defined in config - // NOTE: Please do not override this value unless recommended from snowflake - public static final String SNOWPIPE_STREAMING_FILE_VERSION = "snowflake.streaming.file.version"; + // This is the streaming max client lag which can be defined in config + public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG = + "snowflake.streaming.max.client.lag"; // TESTING public static final String REBALANCING = "snowflake.test.rebalancing"; @@ -541,17 +541,16 @@ static ConfigDef newConfigDef() { ConfigDef.Width.NONE, INGESTION_METHOD_OPT) .define( - SNOWPIPE_STREAMING_FILE_VERSION, - Type.STRING, - "", // default is handled in Ingest SDK - null, // no validator + SNOWPIPE_STREAMING_MAX_CLIENT_LAG, + Type.LONG, + StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC, + ConfigDef.Range.atLeast(StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC), Importance.LOW, - "Acceptable values for Snowpipe Streaming BDEC Versions: 1 and 3. Check Ingest" - + " SDK for default behavior. Please do not set this unless Absolutely needed. ", + "Decide how often the buffer in the Ingest SDK will be flushed", CONNECTOR_CONFIG, 6, ConfigDef.Width.NONE, - SNOWPIPE_STREAMING_FILE_VERSION) + SNOWPIPE_STREAMING_MAX_CLIENT_LAG) .define( ERRORS_TOLERANCE_CONFIG, Type.STRING, diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 8b67b464d..b46e4d9c7 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -435,12 +435,12 @@ static String validateConfig(Map config) { "Schematization is only available with {}.", IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); } - if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION)) { + if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG)) { invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, Utils.formatString( "{} is only available with ingestion type: {}.", - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); } if (config.containsKey( diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index 0e19e6005..55b9ef420 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -17,14 +17,15 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION; -import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.KCLogger; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -77,15 +78,15 @@ public StreamingClientProperties(Map connectorConfig) { STREAMING_CLIENT_PREFIX_NAME + connectorConfig.getOrDefault(Utils.NAME, DEFAULT_CLIENT_NAME); - // Override only if bdec version is explicitly set in config, default to the version set - // inside Ingest SDK + // Override only if the max client lag is explicitly set in config this.parameterOverrides = new HashMap<>(); - Optional snowpipeStreamingBdecVersion = - Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION)); - snowpipeStreamingBdecVersion.ifPresent( + Optional snowpipeStreamingMaxClientLag = + Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); + snowpipeStreamingMaxClientLag.ifPresent( overriddenValue -> { - LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION); - parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue); + LOGGER.info( + "Config is overridden for {}={}", SNOWPIPE_STREAMING_MAX_CLIENT_LAG, overriddenValue); + parameterOverrides.put(MAX_CLIENT_LAG, String.format("%s second", overriddenValue)); }); } @@ -118,7 +119,8 @@ public String getLoggableClientProperties() { @Override public boolean equals(Object other) { return other.getClass().equals(StreamingClientProperties.class) - & ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties); + && ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties) + && ((StreamingClientProperties) other).parameterOverrides.equals(this.parameterOverrides); } /** @@ -129,6 +131,6 @@ public boolean equals(Object other) { */ @Override public int hashCode() { - return this.clientProperties.hashCode(); + return Objects.hash(this.clientProperties, this.parameterOverrides); } } diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java index e834ec3a2..037445955 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java @@ -746,12 +746,12 @@ public void testInvalidValueConvertersForStreamingSnowpipe() { public void testInValidConfigFileTypeForSnowpipe() { try { Map config = getConfig(); - config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3"); + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "3"); Utils.validateConfig(config); } catch (SnowflakeKafkaConnectorException exception) { assert exception .getMessage() - .contains(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION); + .contains(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG); } } @@ -763,14 +763,14 @@ public void testValidFileTypesForSnowpipeStreaming() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3"); + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "3"); Utils.validateConfig(config); - config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); Utils.validateConfig(config); // lower case - config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "abcd"); + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "abcd"); Utils.validateConfig(config); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 1b762553c..98e478266 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; @@ -43,6 +44,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -1461,12 +1463,51 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce } @Test - public void testStreamingIngestion_invalid_file_version() throws Exception { + @Ignore // SNOW-986359: disable for now due to failure in merge gate + public void testStreamingIngestionValidClientLag() throws Exception { + Map config = getConfig(); + config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "30"); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + conn.createTable(table); + + // opens a channel for partition 0, table and topic + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(100) + .setFlushTime(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, new TopicPartition(topic, partition)) // Internally calls startTask + .build(); + + final long noOfRecords = 50; + List sinkRecords = + TestUtils.createJsonStringSinkRecords(0, noOfRecords, topic, partition); + + service.insert(sinkRecords); + try { + // Wait 20 seconds here and no flush should happen since the max client lag is 30 seconds + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, partition)) == noOfRecords, 5, 4); + Assert.fail("The rows should not be flushed"); + } catch (Exception e) { + // do nothing + } + + // Wait for enough time, the rows should be flushed + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, partition)) == noOfRecords, 30, 30); + + service.closeAll(); + } + + @Test + public void testStreamingIngestionInvalidClientLag() { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); Map overriddenConfig = new HashMap<>(config); overriddenConfig.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "TWOO_HUNDRED"); + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "TWOO_HUNDRED"); conn.createTable(table); @@ -1526,6 +1567,15 @@ public void testStreamingIngest_multipleChannel_distinctClients() throws Excepti dogConfig.put(Utils.SF_OAUTH_CLIENT_ID, "2"); dogConfig.put(Utils.NAME, dogTopic); + String fishTopic = "fishTopic_" + TestUtils.randomTableName(); + Map fishConfig = getConfig(); + SnowflakeSinkConnectorConfig.setDefaultValues(fishConfig); + fishConfig.put( + SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); + fishConfig.put(Utils.SF_OAUTH_CLIENT_ID, "2"); + fishConfig.put(Utils.NAME, fishTopic); + fishConfig.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); + // setup connection and create tables TopicPartition catTp = new TopicPartition(catTopic, 0); SnowflakeConnectionService catConn = @@ -1533,9 +1583,14 @@ public void testStreamingIngest_multipleChannel_distinctClients() throws Excepti catConn.createTable(catTopic); TopicPartition dogTp = new TopicPartition(dogTopic, 1); - SnowflakeConnectionService dogconn = + SnowflakeConnectionService dogConn = SnowflakeConnectionServiceFactory.builder().setProperties(dogConfig).build(); - dogconn.createTable(dogTopic); + dogConn.createTable(dogTopic); + + TopicPartition fishTp = new TopicPartition(fishTopic, 1); + SnowflakeConnectionService fishConn = + SnowflakeConnectionServiceFactory.builder().setProperties(fishConfig).build(); + fishConn.createTable(fishTopic); // create the sink services SnowflakeSinkService catService = @@ -1549,48 +1604,70 @@ public void testStreamingIngest_multipleChannel_distinctClients() throws Excepti SnowflakeSinkService dogService = SnowflakeSinkServiceFactory.builder( - dogconn, IngestionMethodConfig.SNOWPIPE_STREAMING, dogConfig) + dogConn, IngestionMethodConfig.SNOWPIPE_STREAMING, dogConfig) .setRecordNumber(1) .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(dogTp))) .addTask(dogTopic, dogTp) // Internally calls startTask .build(); + SnowflakeSinkService fishService = + SnowflakeSinkServiceFactory.builder( + dogConn, IngestionMethodConfig.SNOWPIPE_STREAMING, fishConfig) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(fishTp))) + .addTask(fishTopic, fishTp) // Internally calls startTask + .build(); + // create records final int catRecordCount = 9; final int dogRecordCount = 3; + final int fishRecordCount = 1; List catRecords = TestUtils.createJsonStringSinkRecords(0, catRecordCount, catTp.topic(), catTp.partition()); List dogRecords = TestUtils.createJsonStringSinkRecords(0, dogRecordCount, dogTp.topic(), dogTp.partition()); + List fishRecords = + TestUtils.createJsonStringSinkRecords( + 0, fishRecordCount, fishTp.topic(), fishTp.partition()); // insert records catService.insert(catRecords); dogService.insert(dogRecords); + fishService.insert(fishRecords); // check data was ingested TestUtils.assertWithRetry(() -> catService.getOffset(catTp) == catRecordCount, 20, 20); TestUtils.assertWithRetry(() -> dogService.getOffset(dogTp) == dogRecordCount, 20, 20); + TestUtils.assertWithRetry(() -> fishService.getOffset(fishTp) == fishRecordCount, 20, 20); - // verify two clients were created + // verify three clients were created assert StreamingClientProvider.getStreamingClientProviderInstance() .getRegisteredClients() .containsKey(new StreamingClientProperties(catConfig)); assert StreamingClientProvider.getStreamingClientProviderInstance() .getRegisteredClients() .containsKey(new StreamingClientProperties(dogConfig)); + assert StreamingClientProvider.getStreamingClientProviderInstance() + .getRegisteredClients() + .containsKey(new StreamingClientProperties(fishConfig)); // close services catService.closeAll(); dogService.closeAll(); + fishService.closeAll(); - // verify both clients were closed + // verify three clients were closed assert !StreamingClientProvider.getStreamingClientProviderInstance() .getRegisteredClients() .containsKey(new StreamingClientProperties(catConfig)); assert !StreamingClientProvider.getStreamingClientProviderInstance() .getRegisteredClients() .containsKey(new StreamingClientProperties(dogConfig)); + assert !StreamingClientProvider.getStreamingClientProviderInstance() + .getRegisteredClients() + .containsKey(new StreamingClientProperties(fishConfig)); } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandlerTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandlerTest.java index 285ae1581..cf0b7addd 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandlerTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandlerTest.java @@ -17,7 +17,6 @@ package com.snowflake.kafka.connector.internal.streaming; -import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.TestUtils; import java.util.Map; @@ -87,15 +86,6 @@ public void testCreateClientException() { } } - @Test(expected = IllegalArgumentException.class) - public void testCreateClientInvalidBdecVersion() { - // add invalid bdec version - this.connectorConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); - - // test create - this.streamingClientHandler.createClient(new StreamingClientProperties(this.connectorConfig)); - } - @Test public void testCloseClient() throws Exception { // setup valid client diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java index 98c9cc175..df353724e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java @@ -17,11 +17,11 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProperties.DEFAULT_CLIENT_NAME; import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProperties.LOGGABLE_STREAMING_CONFIG_PROPERTIES; import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProperties.STREAMING_CLIENT_PREFIX_NAME; -import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; @@ -35,7 +35,7 @@ public class StreamingClientPropertiesTest { @Test public void testGetValidProperties() { - String overrideValue = "overrideValue"; + String overrideValue = "10"; // setup config with all loggable properties and parameter override Map connectorConfig = TestUtils.getConfForStreaming(); @@ -44,12 +44,12 @@ public void testGetValidProperties() { connectorConfig.put(Utils.SF_ROLE, "testRole"); connectorConfig.put(Utils.SF_USER, "testUser"); connectorConfig.put(Utils.SF_AUTHENTICATOR, Utils.SNOWFLAKE_JWT); - connectorConfig.put(SNOWPIPE_STREAMING_FILE_VERSION, overrideValue); + connectorConfig.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, overrideValue); Properties expectedProps = StreamingUtils.convertConfigForStreamingClient(connectorConfig); String expectedClientName = STREAMING_CLIENT_PREFIX_NAME + connectorConfig.get(Utils.NAME); Map expectedParameterOverrides = new HashMap<>(); - expectedParameterOverrides.put(BLOB_FORMAT_VERSION, overrideValue); + expectedParameterOverrides.put(MAX_CLIENT_LAG, String.format("%s second", overrideValue)); // test get properties StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig); @@ -99,5 +99,14 @@ public void testStreamingClientPropertiesEquality() { assert prop1.equals(prop2); assert prop1.hashCode() == prop2.hashCode(); + + config1.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); + config2.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "10"); + + prop1 = new StreamingClientProperties(config1); + prop2 = new StreamingClientProperties(config2); + + assert !prop1.equals(prop2); + assert prop1.hashCode() != prop2.hashCode(); } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 75d9de64a..ef9a88ae7 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -460,36 +460,6 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws service.closeAll(); } - @Test(expected = IllegalArgumentException.class) - public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { - // add config which overrides the bdec file format - Map overriddenConfig = new HashMap<>(TestUtils.getConfForStreaming()); - overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); - - InMemorySinkTaskContext inMemorySinkTaskContext = - new InMemorySinkTaskContext(Collections.singleton(topicPartition)); - - // This will automatically create a channel for topicPartition. - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder( - conn, IngestionMethodConfig.SNOWPIPE_STREAMING, overriddenConfig) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(inMemorySinkTaskContext) - .addTask(testTableName, topicPartition) - .build(); - - final long noOfRecords = 1; - - // send regular data - List records = - TestUtils.createJsonStringSinkRecords(0, noOfRecords, topic, PARTITION); - - // should throw because we don't take arrow version 1 anymore - service.insert(records); - service.closeAll(); - } - @Test public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception { Map config = TestUtils.getConfForStreaming();