Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1.8_release_3.10.x' into 1.8_rel…
Browse files Browse the repository at this point in the history
…ease-github
  • Loading branch information
dapeng committed Jul 15, 2020
2 parents d6e7274 + 491a7a8 commit d5725f3
Show file tree
Hide file tree
Showing 29 changed files with 1,169 additions and 136 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ target/
*.eclipse.*
*.iml
plugins/
sqlplugins/
lib/
.vertx/
.DS_Store
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/option/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class Options {
@OptionRequired(description = "log level")
private String logLevel = "info";

@OptionRequired(description = "file add to ship file")
private String addShipfile;


public String getMode() {
return mode;
}
Expand Down Expand Up @@ -183,4 +187,13 @@ public String getLogLevel() {
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
}

public String getAddShipfile() {
return addShipfile;
}

public void setAddShipfile(String addShipfile) {
this.addShipfile = addShipfile;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@

import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -164,6 +160,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes
parseNode(unionRight, sqlParseResult);
}
break;
case MATCH_RECOGNIZE:
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
sqlParseResult.addSourceTable(node.getTableRef().toString());
break;
default:
//do nothing
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/



package com.dtstack.flink.sql.parser;

Expand Down Expand Up @@ -153,14 +152,16 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){

/**
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
* 仅对 table.xx 这种类型的字段进行替换
* @param selectList select Node 的 select 字段
* @param sqlSelect 第一层解析出来的 selectNode
*/
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());

for (int index = 0; index < selectList.size(); index++) {
if (selectList.get(index).getKind().equals(SqlKind.AS)) {
if (selectList.get(index).getKind().equals(SqlKind.AS)
|| ((SqlIdentifier) selectList.get(index)).names.size() == 1) {
sqlNodes.add(selectList.get(index));
continue;
}
Expand Down
112 changes: 112 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.util;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Utility methods for helping with security tasks.
* Date: 2019/12/28
* Company: www.dtstack.com
* @author maqi
*/
public class AuthUtil {

public static String creatJaasFile(String prefix, String suffix, JAASConfig jaasConfig) throws IOException {
File krbConf = new File(System.getProperty("user.dir"));
File temp = File.createTempFile(prefix, suffix, krbConf);
temp.deleteOnExit();
FileUtils.writeStringToFile(temp, jaasConfig.toString());
return temp.getAbsolutePath();
}


public static class JAASConfig {
private String entryName;
private String loginModule;
private String loginModuleFlag;
private Map<String, String> loginModuleOptions;

public JAASConfig(String entryName, String loginModule, String loginModuleFlag, Map<String, String> loginModuleOptions) {
this.entryName = entryName;
this.loginModule = loginModule;
this.loginModuleFlag = loginModuleFlag;
this.loginModuleOptions = loginModuleOptions;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder(entryName).append(" {\n\t")
.append(loginModule).append(" ").append(loginModuleFlag).append("\n\t");
String[] keys = loginModuleOptions.keySet().toArray(new String[loginModuleOptions.size()]);
for (int i = 0; i < keys.length; i++) {
stringBuilder.append(keys[i]).append("=").append(loginModuleOptions.get(keys[i]));
if (i != keys.length - 1) {
stringBuilder.append("\n\t");
} else {
stringBuilder.append(";\n");
}

}
stringBuilder.append("\n").append("};");
return stringBuilder.toString();
}

public static class Builder {
private String entryName;
private String loginModule;
private String loginModuleFlag;
private Map<String, String> loginModuleOptions;

public Builder setEntryName(String entryName) {
this.entryName = entryName;
return this;
}

public Builder setLoginModule(String loginModule) {
this.loginModule = loginModule;
return this;
}

public Builder setLoginModuleFlag(String loginModuleFlag) {
this.loginModuleFlag = loginModuleFlag;
return this;
}

public Builder setLoginModuleOptions(Map<String, String> loginModuleOptions) {
this.loginModuleOptions = loginModuleOptions;
return this;
}

public JAASConfig build() {
return new JAASConfig(
entryName, loginModule, loginModuleFlag, loginModuleOptions);
}
}
}
}
5 changes: 5 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ sh submit.sh -key1 val1 -key2 val2
* 描述:扩展jar路径,当前主要是UDF定义的jar;
* 必选:否
* 默认值:无

* **addShipfile**
* 描述:扩展上传的文件,比如开启;Kerberos认证需要的keytab文件和krb5.conf文件
* 必选:否
* 默认值:无

* **confProp**
* 描述:一些参数设置
Expand Down
80 changes: 79 additions & 1 deletion docs/plugin/hbaseSide.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@
| tableName | hbase 的表名称|||
| cache | 维表缓存策略(NONE/LRU)||NONE|
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|

|kerberosAuthEnable | 是否开启kerberos认证||false|
|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|||
|clientKeytabFile|client的keytab 文件||
|clientPrincipal|client的principal|||
|zookeeperSaslClient | zookeeper.sasl.client值||true|
|securityKrb5Conf | java.security.krb5.conf值|||
另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf
同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md)
--------------

## 5.样例
Expand Down Expand Up @@ -168,4 +175,75 @@ into
sideTable b
on a.id=b.rowkey1 and a.name = b.rowkey2;
```
### kerberos维表示例
```
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv INT,
xctime bigint
)WITH(
type ='kafka11',
bootstrapServers ='172.16.8.107:9092',
zookeeperQuorum ='172.16.8.107:2181/kafka',
offsetReset ='latest',
topic ='es_test',
timezone='Asia/Shanghai',
updateMode ='append',
enableKeyPartitions ='false',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE MyResult(
name varchar,
channel varchar
)WITH(
type ='mysql',
url ='jdbc:mysql://172.16.10.45:3306/test',
userName ='dtstack',
password ='abc123',
tableName ='myresult',
updateMode ='append',
parallelism ='1',
batchSize ='100',
batchWaitInterval ='1000'
);
CREATE TABLE sideTable(
cf:name varchar as name,
cf:info varchar as info,
PRIMARY KEY(md5(name) +'test') ,
PERIOD FOR SYSTEM_TIME
)WITH(
type ='hbase',
zookeeperQuorum ='172.16.10.104:2181,172.16.10.224:2181,172.16.10.252:2181',
zookeeperParent ='/hbase',
tableName ='workerinfo',
partitionedJoin ='false',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
asyncTimeoutNum ='0',
parallelism ='1',
kerberosAuthEnable='true',
regionserverPrincipal='hbase/[email protected]',
clientKeytabFile='test.keytab',
clientPrincipal='[email protected]',
securityKrb5Conf='krb5.conf',
);
insert into
MyResult
select
b.name as name,
a.channel
from
MyTable a
join
sideTable b
on a.channel=b.name
```
64 changes: 61 additions & 3 deletions docs/plugin/hbaseSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ hbase2.0
|rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开|||
|updateMode|APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新|否|APPEND|
|parallelism | 并行度设置||1|

|kerberosAuthEnable | 是否开启kerberos认证||false|
|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|||
|clientKeytabFile|client的keytab 文件||
|clientPrincipal|client的principal|||
|zookeeperSaslClient | zookeeper.sasl.client值||true|
|securityKrb5Conf | java.security.krb5.conf值|||
另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf
同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md)
## 5.样例:

### 普通结果表语句示例
```
CREATE TABLE MyTable(
name varchar,
Expand Down Expand Up @@ -78,9 +86,59 @@ into
channel,
name
from
MyTable a
MyTable a
```

### kerberos认证结果表语句示例
```
CREATE TABLE MyTable(
name varchar,
channel varchar,
age int
)WITH(
type ='kafka10',
bootstrapServers ='172.16.8.107:9092',
zookeeperQuorum ='172.16.8.107:2181/kafka',
offsetReset ='latest',
topic ='mqTest01',
timezone='Asia/Shanghai',
updateMode ='append',
enableKeyPartitions ='false',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE MyResult(
cf:name varchar ,
cf:channel varchar
)WITH(
type ='hbase',
zookeeperQuorum ='cdh2.cdhsite:2181,cdh4.cdhsite:2181',
zookeeperParent ='/hbase',
tableName ='myresult',
partitionedJoin ='false',
parallelism ='1',
rowKey='name',
kerberosAuthEnable='true',
regionserverPrincipal='hbase/[email protected]',
clientKeytabFile='test.keytab',
clientPrincipal='[email protected]',
securityKrb5Conf='krb5.conf',
);
insert
into
MyResult
select
channel,
name
from
MyTable a
```

## 6.hbase数据
### 数据内容说明
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接
Expand Down
Loading

0 comments on commit d5725f3

Please sign in to comment.