Skip to content

Commit

Permalink
[SNOW-954150] Use map of clients with different configurations instea…
Browse files Browse the repository at this point in the history
…d of one client for multiple connectors configurations (#744)
  • Loading branch information
sfc-gh-rcheng committed Dec 7, 2023
1 parent d4cdd5a commit c546782
Show file tree
Hide file tree
Showing 11 changed files with 939 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void closeAll() {
partitionsToChannel.clear();

StreamingClientProvider.getStreamingClientProviderInstance()
.closeClient(this.streamingIngestClient);
.closeClient(this.connectorConfig, this.streamingIngestClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
Expand All @@ -35,9 +28,6 @@
/** This class handles all calls to manage the streaming ingestion client */
public class StreamingClientHandler {
private static final KCLogger LOGGER = new KCLogger(StreamingClientHandler.class.getName());
private static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";
private static final String TEST_CLIENT_NAME = "TEST_CLIENT";

private AtomicInteger createdClientId = new AtomicInteger(0);

/**
Expand All @@ -51,40 +41,27 @@ public static boolean isClientValid(SnowflakeStreamingIngestClient client) {
}

/**
* Creates a streaming client from the given config
* Creates a streaming client from the given properties
*
* @param connectorConfig The config to create the client
* @param streamingClientProperties The properties to create the client
* @return A newly created client
*/
public SnowflakeStreamingIngestClient createClient(Map<String, String> connectorConfig) {
public SnowflakeStreamingIngestClient createClient(
StreamingClientProperties streamingClientProperties) {
LOGGER.info("Initializing Streaming Client...");

// get streaming properties from config
Properties streamingClientProps = new Properties();
streamingClientProps.putAll(
StreamingUtils.convertConfigForStreamingClient(new HashMap<>(connectorConfig)));

try {
// Override only if bdec version is explicitly set in config, default to the version set
// inside Ingest SDK
Map<String, Object> parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});

String clientName = this.getNewClientName(connectorConfig);

SnowflakeStreamingIngestClient createdClient =
SnowflakeStreamingIngestClientFactory.builder(clientName)
.setProperties(streamingClientProps)
.setParameterOverrides(parameterOverrides)
SnowflakeStreamingIngestClientFactory.builder(
streamingClientProperties.clientName + "_" + createdClientId.getAndIncrement())
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides)
.build();

LOGGER.info("Successfully initialized Streaming Client:{}", clientName);
LOGGER.info(
"Successfully initialized Streaming Client:{} with properties {}",
streamingClientProperties.clientName,
streamingClientProperties.getLoggableClientProperties());

return createdClient;
} catch (SFException ex) {
Expand Down Expand Up @@ -115,11 +92,4 @@ public void closeClient(SnowflakeStreamingIngestClient client) {
LOGGER.error(Utils.getExceptionMessage("Failure closing Streaming client", e));
}
}

private String getNewClientName(Map<String, String> connectorConfig) {
return STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, TEST_CLIENT_NAME)
+ "_"
+ createdClientId.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.snowflake.ingest.utils.Constants;

/**
* Object to convert and store properties for {@link
* net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient}. This object is used to compare
* equality between clients in {@link StreamingClientProvider}.
*/
public class StreamingClientProperties {
public static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";
public static final String DEFAULT_CLIENT_NAME = "DEFAULT_CLIENT";

private static final KCLogger LOGGER = new KCLogger(StreamingClientProperties.class.getName());

// contains converted config properties that are loggable (not PII data)
public static final List<String> LOGGABLE_STREAMING_CONFIG_PROPERTIES =
Stream.of(
Constants.ACCOUNT_URL,
Constants.ROLE,
Constants.USER,
StreamingUtils.STREAMING_CONSTANT_AUTHORIZATION_TYPE)
.collect(Collectors.toList());

public final Properties clientProperties;
public final String clientName;
public final Map<String, Object> parameterOverrides;

/**
* Creates non-null properties, client name and parameter overrides for the {@link
* net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient} from the given connectorConfig
* Properties are created by {@link StreamingUtils#convertConfigForStreamingClient(Map)} and are a
* subset of the given connector configuration
*
* @param connectorConfig Given connector configuration. Null configs are treated as an empty map
*/
public StreamingClientProperties(Map<String, String> connectorConfig) {
// treat null config as empty config
if (connectorConfig == null) {
LOGGER.warn(
"Creating empty streaming client properties because given connector config was empty");
connectorConfig = new HashMap<>();
}

this.clientProperties = StreamingUtils.convertConfigForStreamingClient(connectorConfig);

this.clientName =
STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, DEFAULT_CLIENT_NAME);

// Override only if bdec version is explicitly set in config, default to the version set
// inside Ingest SDK
this.parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});
}

/**
* Gets the loggable properties, see {@link
* StreamingClientProperties#LOGGABLE_STREAMING_CONFIG_PROPERTIES}
*
* @return A formatted string with the loggable properties
*/
public String getLoggableClientProperties() {
return this.clientProperties == null | this.clientProperties.isEmpty()
? ""
: this.clientProperties.entrySet().stream()
.filter(
propKvp ->
LOGGABLE_STREAMING_CONFIG_PROPERTIES.stream()
.anyMatch(propKvp.getKey().toString()::equalsIgnoreCase))
.collect(Collectors.toList())
.toString();
}

/**
* Determines equality between StreamingClientProperties by only looking at the parsed
* clientProperties. This is used in {@link StreamingClientProvider} to determine equality in
* registered clients
*
* @param other other object to determine equality
* @return if the given object's clientProperties exists and is equal
*/
@Override
public boolean equals(Object other) {
return other.getClass().equals(StreamingClientProperties.class)
& ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties);
}

/**
* Creates the hashcode for this object from the clientProperties. This is used in {@link
* StreamingClientProvider} to determine equality in registered clients
*
* @return the clientProperties' hashcode
*/
@Override
public int hashCode() {
return this.clientProperties.hashCode();
}
}
Loading

0 comments on commit c546782

Please sign in to comment.