Skip to content

Commit

Permalink
[feature](merge-cloud) Add BrokerLoad and DriverUrl white list
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei committed Apr 18, 2024
1 parent 048bc99 commit 9c4dc9e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
11 changes: 11 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,17 @@ public static boolean isNotCloudMode() {

@ConfField(mutable = true)
public static int mow_insert_into_commit_retry_times = 10;

@ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: s3_load_endpoint_white_list=a,b,c",
"the white list for the s3 load endpoint, if it is empty, no white list will be set,"
+ "for example: s3_load_endpoint_white_list=a,b,c"})
public static String[] s3_load_endpoint_white_list = {};

@ConfField(mutable = true, description = {"指定Jdbc driver url白名单, 举例: jdbc_driver_url_white_list=a,b,c",
"the white list for jdbc driver url, if it is empty, no white list will be set"
+ "for example: jdbc_driver_url_white_list=a,b,c"
})
public static String[] jdbc_driver_url_white_list = {};
//==========================================================================
// end of cloud config
//==========================================================================
Expand Down
59 changes: 59 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand All @@ -36,10 +38,16 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -73,6 +81,8 @@
// WITH RESOURCE name
// (key3=value3, ...)
public class LoadStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(LoadStmt.class);

public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
Expand All @@ -92,6 +102,10 @@ public class LoadStmt extends DdlStmt {
public static final String BOS_ACCESSKEY = "bos_accesskey";
public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";

// for S3 load check
public static final List<String> PROVIDERS =
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos"));

// mini load params
public static final String KEY_IN_PARAM_COLUMNS = "columns";
public static final String KEY_IN_PARAM_SET = "set";
Expand Down Expand Up @@ -485,6 +499,7 @@ public void analyze(Analyzer analyzer) throws UserException {
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
checkWhiteList();
} else if (isMysqlLoad) {
etlJobType = EtlJobType.LOCAL_FILE;
} else {
Expand Down Expand Up @@ -556,4 +571,48 @@ public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}

private void checkEndpoint(String endpoint) throws UserException {
HttpURLConnection connection = null;
try {
String urlStr = "http://" + endpoint;
SecurityChecker.getInstance().startSSRFChecking(urlStr);
URL url = new URL(urlStr);
connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(10000);
connection.connect();
} catch (Exception e) {
LOG.warn("Failed to connect endpoint={}", endpoint, e);
throw new UserException(e.getMessage());
} finally {
if (connection != null) {
try {
connection.disconnect();
} catch (Exception e) {
LOG.warn("Failed to disconnect connection, endpoint={}", endpoint, e);
}
}
SecurityChecker.getInstance().stopSSRFChecking();
}
}

public void checkWhiteList() throws UserException {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
&& brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.REGION)) {
String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
endpoint = endpoint.replaceFirst("^http://", "");
endpoint = endpoint.replaceFirst("^https://", "");
List<String> whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
whiteList.removeIf(String::isEmpty);
if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
throw new UserException("endpoint: " + endpoint
+ " is not in s3 load endpoint white list: " + String.join(",", whiteList));
}
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
checkEndpoint(endpoint);
}
}
}
38 changes: 23 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;


/**
* External JDBC Catalog resource for external table query.
* <p>
Expand Down Expand Up @@ -285,28 +286,35 @@ public static String computeObjectChecksum(String driverPath) throws DdlExceptio
}
}

private static void checkCloudWhiteList(String driverUrl) throws IllegalArgumentException {
// For compatibility with cloud mode, we use both `jdbc_driver_url_white_list`
// and jdbc_driver_secure_path to check whitelist
List<String> cloudWhiteList = new ArrayList<>(Arrays.asList(Config.jdbc_driver_url_white_list));
cloudWhiteList.removeIf(String::isEmpty);
if (!cloudWhiteList.isEmpty() && !cloudWhiteList.contains(driverUrl)) {
throw new IllegalArgumentException("Driver URL does not match any allowed paths" + driverUrl);
}
}

public static String getFullDriverUrl(String driverUrl) throws IllegalArgumentException {
try {
URI uri = new URI(driverUrl);
String schema = uri.getScheme();
checkCloudWhiteList(driverUrl);
if (schema == null && !driverUrl.startsWith("/")) {
return "file://" + Config.jdbc_drivers_dir + "/" + driverUrl;
} else {
if ("*".equals(Config.jdbc_driver_secure_path)) {
return driverUrl;
} else if (Config.jdbc_driver_secure_path.trim().isEmpty()) {
throw new IllegalArgumentException(
"jdbc_driver_secure_path is set to empty, disallowing all driver URLs.");
} else {
boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";"))
}

if ("*".equals(Config.jdbc_driver_secure_path)) {
return driverUrl;
}

boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";"))
.anyMatch(allowedPath -> driverUrl.startsWith(allowedPath.trim()));
if (!isAllowed) {
throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl);
} else {
return driverUrl;
}
}
if (!isAllowed) {
throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl);
}
return driverUrl;
} catch (URISyntaxException e) {
LOG.warn("invalid jdbc driver url: " + driverUrl);
return driverUrl;
Expand Down

0 comments on commit 9c4dc9e

Please sign in to comment.