Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-954150] Use map of clients with different configurations instead of one client for multiple connectors configurations #744

Merged
merged 53 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
1ca1d32
builds, fixing tests
sfc-gh-rcheng Nov 10, 2023
15da6b5
passes ut
sfc-gh-rcheng Nov 10, 2023
f5fe7d8
autoformatting
sfc-gh-rcheng Nov 10, 2023
7426bc9
added tests for cc
sfc-gh-rcheng Nov 10, 2023
a316c0e
passes it
sfc-gh-rcheng Nov 10, 2023
07659a2
autoformatting
sfc-gh-rcheng Nov 10, 2023
d5e7e20
Merge branch 'master' into rcheng-clientfix
sfc-gh-rcheng Nov 13, 2023
4c7a3a5
pr reviews
sfc-gh-rcheng Nov 14, 2023
6cce84d
autoformatting
sfc-gh-rcheng Nov 14, 2023
a14f388
stash
sfc-gh-rcheng Nov 16, 2023
e18292a
autoformatting
sfc-gh-rcheng Nov 16, 2023
e762419
pr comments
sfc-gh-rcheng Nov 16, 2023
c2f9a6a
autoformatting
sfc-gh-rcheng Nov 16, 2023
9fbd4e5
stash
sfc-gh-rcheng Nov 17, 2023
02a8c58
cache loader imp and pass test
sfc-gh-rcheng Nov 17, 2023
3b4e729
autoformatting
sfc-gh-rcheng Nov 17, 2023
89109c1
fix close
sfc-gh-rcheng Nov 17, 2023
d87cdf3
autoformatting
sfc-gh-rcheng Nov 17, 2023
4f85c16
nits
sfc-gh-rcheng Nov 17, 2023
15953d5
autoformatting
sfc-gh-rcheng Nov 17, 2023
0ce288a
push for compare
sfc-gh-rcheng Nov 20, 2023
7628b75
map not properties
sfc-gh-rcheng Nov 20, 2023
7014658
autoformatting
sfc-gh-rcheng Nov 20, 2023
123d4c4
add comments
sfc-gh-rcheng Nov 20, 2023
49c72d4
autoformatting
sfc-gh-rcheng Nov 20, 2023
df0aaa7
personal nits
sfc-gh-rcheng Nov 20, 2023
275129c
autoformatting
sfc-gh-rcheng Nov 20, 2023
fffaead
check cc
sfc-gh-rcheng Nov 21, 2023
7d43de4
autoformatting
sfc-gh-rcheng Nov 21, 2023
87fd612
replace builder
sfc-gh-rcheng Nov 21, 2023
97e9aa5
autoformatting
sfc-gh-rcheng Nov 21, 2023
369c55d
concurrency with invalidation
sfc-gh-rcheng Nov 21, 2023
5ef97aa
autoformatting
sfc-gh-rcheng Nov 21, 2023
f2df6fb
comment nit
sfc-gh-rcheng Nov 21, 2023
2db159c
limit 10000 clients
sfc-gh-rcheng Nov 22, 2023
a076c51
use properties
sfc-gh-rcheng Nov 22, 2023
1ffd3eb
autoformatting
sfc-gh-rcheng Nov 22, 2023
8f0c73c
add more tests
sfc-gh-rcheng Nov 22, 2023
f75204d
uncomment concurrency test
sfc-gh-rcheng Nov 22, 2023
af10de3
autoformatting
sfc-gh-rcheng Nov 22, 2023
d6575ed
split properties to new file, add tests and add personal comment nits
sfc-gh-rcheng Nov 22, 2023
2db9438
autoformatting
sfc-gh-rcheng Nov 22, 2023
5d7f683
add threadsafe comment
sfc-gh-rcheng Nov 22, 2023
314c658
autoformatting
sfc-gh-rcheng Nov 22, 2023
4b4e25f
bump retry count
sfc-gh-rcheng Nov 23, 2023
9e2a386
nit log, add atomic back in, update handler tests
sfc-gh-rcheng Nov 28, 2023
99e0359
autoformatting
sfc-gh-rcheng Nov 28, 2023
381e1c8
actually do the log change. fix concurrency issue with IT
sfc-gh-rcheng Nov 28, 2023
410b0d0
autoformatting
sfc-gh-rcheng Nov 28, 2023
377894c
run in parallel to ensure fix?
sfc-gh-rcheng Nov 30, 2023
b937485
fix it
sfc-gh-rcheng Nov 30, 2023
94b3797
autoformatting
sfc-gh-rcheng Nov 30, 2023
f7d6e5e
use SF_OAUTH_CLIENT_ID instead of role
sfc-gh-rcheng Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public void closeAll() {
partitionsToChannel.clear();

StreamingClientProvider.getStreamingClientProviderInstance()
.closeClient(this.streamingIngestClient);
.closeClient(this.connectorConfig, this.streamingIngestClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
Expand All @@ -35,9 +28,6 @@
/** This class handles all calls to manage the streaming ingestion client */
public class StreamingClientHandler {
private static final KCLogger LOGGER = new KCLogger(StreamingClientHandler.class.getName());
private static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";
private static final String TEST_CLIENT_NAME = "TEST_CLIENT";

private AtomicInteger createdClientId = new AtomicInteger(0);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this back (was removed in previous PR iterations) in case removing it causes concurrency issues with client naming


/**
Expand All @@ -51,40 +41,27 @@ public static boolean isClientValid(SnowflakeStreamingIngestClient client) {
}

/**
* Creates a streaming client from the given config
* Creates a streaming client from the given properties
*
* @param connectorConfig The config to create the client
* @param streamingClientProperties The properties to create the client
* @return A newly created client
*/
public SnowflakeStreamingIngestClient createClient(Map<String, String> connectorConfig) {
public SnowflakeStreamingIngestClient createClient(
StreamingClientProperties streamingClientProperties) {
LOGGER.info("Initializing Streaming Client...");

// get streaming properties from config
Properties streamingClientProps = new Properties();
Copy link
Collaborator Author

@sfc-gh-rcheng sfc-gh-rcheng Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and getNewClientName is moved into StreamingClientProperties.java

streamingClientProps.putAll(
StreamingUtils.convertConfigForStreamingClient(new HashMap<>(connectorConfig)));

try {
// Override only if bdec version is explicitly set in config, default to the version set
// inside Ingest SDK
Map<String, Object> parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});

String clientName = this.getNewClientName(connectorConfig);

SnowflakeStreamingIngestClient createdClient =
SnowflakeStreamingIngestClientFactory.builder(clientName)
.setProperties(streamingClientProps)
.setParameterOverrides(parameterOverrides)
SnowflakeStreamingIngestClientFactory.builder(
streamingClientProperties.clientName + "_" + createdClientId.getAndIncrement())
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides)
.build();

LOGGER.info("Successfully initialized Streaming Client:{}", clientName);
LOGGER.info(
"Successfully initialized Streaming Client:{} with properties {}",
streamingClientProperties.clientName,
streamingClientProperties.getLoggableClientProperties());

return createdClient;
} catch (SFException ex) {
Expand Down Expand Up @@ -115,11 +92,4 @@ public void closeClient(SnowflakeStreamingIngestClient client) {
LOGGER.error(Utils.getExceptionMessage("Failure closing Streaming client", e));
}
}

private String getNewClientName(Map<String, String> connectorConfig) {
return STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, TEST_CLIENT_NAME)
+ "_"
+ createdClientId.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.snowflake.ingest.utils.Constants;

/**
* Object to convert and store properties for {@link
* net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient}. This object is used to compare
* equality between clients in {@link StreamingClientProvider}.
*/
public class StreamingClientProperties {
public static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";
public static final String DEFAULT_CLIENT_NAME = "DEFAULT_CLIENT";

private static final KCLogger LOGGER = new KCLogger(StreamingClientProperties.class.getName());

// contains converted config properties that are loggable (not PII data)
public static final List<String> LOGGABLE_STREAMING_CONFIG_PROPERTIES =
Stream.of(
Constants.ACCOUNT_URL,
Constants.ROLE,
Constants.USER,
StreamingUtils.STREAMING_CONSTANT_AUTHORIZATION_TYPE)
.collect(Collectors.toList());

public final Properties clientProperties;
public final String clientName;
public final Map<String, Object> parameterOverrides;

/**
* Creates non-null properties, client name and parameter overrides for the {@link
* net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient} from the given connectorConfig
* Properties are created by {@link StreamingUtils#convertConfigForStreamingClient(Map)} and are a
* subset of the given connector configuration
*
* @param connectorConfig Given connector configuration. Null configs are treated as an empty map
*/
public StreamingClientProperties(Map<String, String> connectorConfig) {
// treat null config as empty config
if (connectorConfig == null) {
LOGGER.warn(
"Creating empty streaming client properties because given connector config was empty");
connectorConfig = new HashMap<>();
}

this.clientProperties = StreamingUtils.convertConfigForStreamingClient(connectorConfig);

this.clientName =
STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, DEFAULT_CLIENT_NAME);

// Override only if bdec version is explicitly set in config, default to the version set
// inside Ingest SDK
this.parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});
}

/**
* Gets the loggable properties, see {@link
* StreamingClientProperties#LOGGABLE_STREAMING_CONFIG_PROPERTIES}
*
* @return A formatted string with the loggable properties
*/
public String getLoggableClientProperties() {
return this.clientProperties == null | this.clientProperties.isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can clientProperties be null or empty? I thought we have check to make sure some of the configurations are required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't ever be null or empty, but I added the check just in case

? ""
: this.clientProperties.entrySet().stream()
.filter(
propKvp ->
LOGGABLE_STREAMING_CONFIG_PROPERTIES.stream()
.anyMatch(propKvp.getKey().toString()::equalsIgnoreCase))
.collect(Collectors.toList())
.toString();
}

/**
* Determines equality between StreamingClientProperties by only looking at the parsed
* clientProperties. This is used in {@link StreamingClientProvider} to determine equality in
* registered clients
*
* @param other other object to determine equality
* @return if the given object's clientProperties exists and is equal
*/
@Override
public boolean equals(Object other) {
return other.getClass().equals(StreamingClientProperties.class)
& ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties);
}

/**
* Creates the hashcode for this object from the clientProperties. This is used in {@link
* StreamingClientProvider} to determine equality in registered clients
*
* @return the clientProperties' hashcode
*/
@Override
public int hashCode() {
return this.clientProperties.hashCode();
}
}
Loading