From 9c4dc9efc6a0ccdcb2eebf6527ac3299baf84e64 Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Thu, 28 Mar 2024 18:42:18 +0800 Subject: [PATCH] [feature](merge-cloud) Add `BrokerLoad` and `DriverUrl` white list --- .../java/org/apache/doris/common/Config.java | 11 ++++ .../org/apache/doris/analysis/LoadStmt.java | 59 +++++++++++++++++++ .../apache/doris/catalog/JdbcResource.java | 38 +++++++----- 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index efe0285f5ee9b7b..230fe8bb921377c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2713,6 +2713,17 @@ public static boolean isNotCloudMode() { @ConfField(mutable = true) public static int mow_insert_into_commit_retry_times = 10; + + @ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: s3_load_endpoint_white_list=a,b,c", + "the white list for the s3 load endpoint, if it is empty, no white list will be set," + + "for example: s3_load_endpoint_white_list=a,b,c"}) + public static String[] s3_load_endpoint_white_list = {}; + + @ConfField(mutable = true, description = {"指定Jdbc driver url白名单, 举例: jdbc_driver_url_white_list=a,b,c", + "the white list for jdbc driver url, if it is empty, no white list will be set" + + "for example: jdbc_driver_url_white_list=a,b,c" + }) + public static String[] jdbc_driver_url_white_list = {}; //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 19b721295160d87..b658dd16faf6771 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -21,12 +21,14 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.security.SecurityChecker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -36,10 +38,16 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.Nullable; import java.io.File; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -73,6 +81,8 @@ // WITH RESOURCE name // (key3=value3, ...) public class LoadStmt extends DdlStmt { + private static final Logger LOG = LogManager.getLogger(LoadStmt.class); + public static final String TIMEOUT_PROPERTY = "timeout"; public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; @@ -92,6 +102,10 @@ public class LoadStmt extends DdlStmt { public static final String BOS_ACCESSKEY = "bos_accesskey"; public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey"; + // for S3 load check + public static final List PROVIDERS = + new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos")); + // mini load params public static final String KEY_IN_PARAM_COLUMNS = "columns"; public static final String KEY_IN_PARAM_SET = "set"; @@ -485,6 +499,7 @@ public void analyze(Analyzer analyzer) throws UserException { } } else if (brokerDesc != null) { etlJobType = EtlJobType.BROKER; + checkWhiteList(); } else if (isMysqlLoad) { etlJobType = EtlJobType.LOCAL_FILE; } else { @@ -556,4 +571,48 @@ public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_WITH_SYNC; } } + + private void checkEndpoint(String endpoint) throws UserException { + HttpURLConnection connection = null; + try { + String urlStr = "http://" + endpoint; + SecurityChecker.getInstance().startSSRFChecking(urlStr); + URL url = new URL(urlStr); + connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(10000); + connection.connect(); + } catch (Exception e) { + LOG.warn("Failed to connect endpoint={}", endpoint, e); + throw new UserException(e.getMessage()); + } finally { + if (connection != null) { + try { + connection.disconnect(); + } catch (Exception e) { + LOG.warn("Failed to disconnect connection, endpoint={}", endpoint, e); + } + } + SecurityChecker.getInstance().stopSSRFChecking(); + } + } + + public void checkWhiteList() throws UserException { + Map brokerDescProperties = brokerDesc.getProperties(); + if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT) + && brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY) + && brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY) + && brokerDescProperties.containsKey(S3Properties.Env.REGION)) { + String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT); + endpoint = endpoint.replaceFirst("^http://", ""); + endpoint = endpoint.replaceFirst("^https://", ""); + List whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list)); + whiteList.removeIf(String::isEmpty); + if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) { + throw new UserException("endpoint: " + endpoint + + " is not in s3 load endpoint white list: " + String.join(",", whiteList)); + } + brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint); + checkEndpoint(endpoint); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 65bf7d308ff02c5..60e9f0abc170d9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -42,10 +42,11 @@ import java.security.NoSuchAlgorithmException; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; - /** * External JDBC Catalog resource for external table query. *

@@ -285,28 +286,35 @@ public static String computeObjectChecksum(String driverPath) throws DdlExceptio } } + private static void checkCloudWhiteList(String driverUrl) throws IllegalArgumentException { + // For compatibility with cloud mode, we use both `jdbc_driver_url_white_list` + // and jdbc_driver_secure_path to check whitelist + List cloudWhiteList = new ArrayList<>(Arrays.asList(Config.jdbc_driver_url_white_list)); + cloudWhiteList.removeIf(String::isEmpty); + if (!cloudWhiteList.isEmpty() && !cloudWhiteList.contains(driverUrl)) { + throw new IllegalArgumentException("Driver URL does not match any allowed paths" + driverUrl); + } + } + public static String getFullDriverUrl(String driverUrl) throws IllegalArgumentException { try { URI uri = new URI(driverUrl); String schema = uri.getScheme(); + checkCloudWhiteList(driverUrl); if (schema == null && !driverUrl.startsWith("/")) { return "file://" + Config.jdbc_drivers_dir + "/" + driverUrl; - } else { - if ("*".equals(Config.jdbc_driver_secure_path)) { - return driverUrl; - } else if (Config.jdbc_driver_secure_path.trim().isEmpty()) { - throw new IllegalArgumentException( - "jdbc_driver_secure_path is set to empty, disallowing all driver URLs."); - } else { - boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";")) + } + + if ("*".equals(Config.jdbc_driver_secure_path)) { + return driverUrl; + } + + boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";")) .anyMatch(allowedPath -> driverUrl.startsWith(allowedPath.trim())); - if (!isAllowed) { - throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl); - } else { - return driverUrl; - } - } + if (!isAllowed) { + throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl); } + return driverUrl; } catch (URISyntaxException e) { LOG.warn("invalid jdbc driver url: " + driverUrl); return driverUrl;