Skip to content

Commit

Permalink
Merge pull request #65 from jhc-systems/secure-connect
Browse files Browse the repository at this point in the history
add config and support for secure connections
  • Loading branch information
msillence authored Nov 10, 2023
2 parents 1ebff15 + 15db57d commit 8732ac1
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 112 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ kafka-smt-collection/.flattened-pom.xml
structured-logging/.flattened-pom.xml
.vscode/
debezium-connector-ibmi/src/test/resources/confluent-auth-template.properties
journal-parsing/archive/
journal-parsing/logs/
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ REPLICATION_FACTOR=3
```

# Problems

## running with a self signed certificate

capture your ca cert normally the top of the output:
`openssl s_client -showcerts -connect <host>:9471`

between the first pair of `-----BEGIN CERTIFICATE-----` and end and save it to iseries-cert.pem


If using docker simply mount your certs at /var/tls

If running natively import the cert

keytool -import -noprompt -alias iseries-cert -storepass changeit -keystore /usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts -file iseries-cert.pem

## Journals deleted

If the journal is deleted before it is read it will log an error: "Lost journal at position xxx" and reset to the beginning journal
Expand Down Expand Up @@ -98,6 +113,7 @@ file connector-name.json:
"user": "xxx",
"password": "xxx",
"port": "",
"secure": true
"poll.interval.ms": "2000",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
Expand All @@ -119,6 +135,7 @@ file connector-name.json:

Note the `dbname` can be blank and will be used as part of the jdbc connect string : `dbc://hostname/dbname`


Optional:

```
Expand Down Expand Up @@ -269,6 +286,10 @@ To run in VS Code, configure the following launch.json file, and run from the Ru

## Release notes

### 1.10.4

New configuration paramater `secure` this defaults to true

### 1.10.2

Fixes data loss bugs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;

//TODO can we deliver HistorizedRelationalDatabaseConnectorConfig or should it be RelationalDatabaseConnectorConfig
//TODO can we deliver HistorizedRelationalDatabaseConnectorConfig or should it be RelationalDatabaseConnectorConfig
public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {
private static TableIdToStringMapper tableToString = x -> {
StringBuilder sb = new StringBuilder(x.schema());
final StringBuilder sb = new StringBuilder(x.schema());
sb.append(".").append(x.table());
return sb.toString();
};
Expand Down Expand Up @@ -71,6 +71,11 @@ public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {
public static final Field KEEP_ALIVE = Field.create("keep alive", "keep alive",
"keep alive", true);

/**
* keep alive flag, should the driver use a secure connection defaults to false
*/
public static final Field SECURE = Field.create("secure", "secure", "use secure connection", true);

/**
* threads should be used in communication with the host servers - timeouts might not work as expected when true - default false
*/
Expand All @@ -81,7 +86,7 @@ public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {
* The timeout to use for sockets
*/
public static final Field SOCKET_TIMEOUT = Field.create("socket timeout", "socket timeout in milliseconds", "socket timeout", 0);

/**
* If the ccsid is wrong on your tables and that is the least of your problems - just correct the CCSID before using this or as a last resort...
* This applies to all tables - everything
Expand All @@ -99,7 +104,7 @@ public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {
public static final Field DATE_FORMAT= Field.create("date format", "date format", "default date format is 2 digit date 1940->2039 set this to 'iso' or make sure you only have dates in this range, performance is ambysmal if you don't not to mention lots of missing data", "iso");

public static final Field DB_ERRORS = Field.create("errors", "full error reporting", "jdbc level of detail to include options are: 'basic', or 'full'", "full");

public static final long DEFAULT_MAX_JOURNAL_TIMEOUT = 60000;
/**
* Maximum number of journal entries to process server side
Expand All @@ -117,7 +122,7 @@ public class As400ConnectorConfig extends RelationalDatabaseConnectorConfig {

public As400ConnectorConfig(Configuration config) {
super(config, new SystemTablesPredicate(),
tableToString, 1, ColumnFilterMode.SCHEMA, false);
tableToString, 1, ColumnFilterMode.SCHEMA, false);
this.config = config;
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.tableFilters = new As400NormalRelationalTableFilters(config, new SystemTablesPredicate(), tableToString);
Expand Down Expand Up @@ -156,37 +161,42 @@ public Integer getJournalBufferSize() {
}

public Integer getSocketTimeout() {
Integer i = config.getInteger(SOCKET_TIMEOUT);
final Integer i = config.getInteger(SOCKET_TIMEOUT);
return i;
}

public Integer getKeepAlive() {
return config.getInteger(KEEP_ALIVE);
}

public Integer getMaxServerSideEntries() {
return config.getInteger(MAX_SERVER_SIDE_ENTRIES);
}

public Integer getMaxRetrievalTimeout() {
return config.getInteger(MAX_RETRIEVAL_TIMEOUT);
}


public Integer getFromCcsid() {
return config.getInteger(FROM_CCSID);
}
public Integer getToCcsid() {
return config.getInteger(TO_CCSID);
}

public boolean isSecure() {
return config.getBoolean(SECURE);
}

public JournalProcessedPosition getOffset() {
String receiver = config.getString(As400OffsetContext.RECEIVER);
String lib = config.getString(As400OffsetContext.RECEIVER_LIBRARY);
String offset = config.getString(As400OffsetContext.EVENT_SEQUENCE);
Boolean processed = config.getBoolean(As400OffsetContext.PROCESSED);
Long configTime = config.getLong(As400OffsetContext.EVENT_TIME);
Instant time = (configTime == null) ? Instant.ofEpochSecond(0) : Instant.ofEpochSecond(configTime);
return new JournalProcessedPosition(offset, receiver, lib, time, (processed == null) ? false : processed);
final String receiver = config.getString(As400OffsetContext.RECEIVER);
final String lib = config.getString(As400OffsetContext.RECEIVER_LIBRARY);
final String offset = config.getString(As400OffsetContext.EVENT_SEQUENCE);
final Boolean processed = config.getBoolean(As400OffsetContext.PROCESSED);
final Long configTime = config.getLong(As400OffsetContext.EVENT_TIME);
final Instant time = (configTime == null) ? Instant.ofEpochSecond(0) : Instant.ofEpochSecond(configTime);
return new JournalProcessedPosition(offset, receiver, lib, time, (processed == null) ? false : processed);
}

private static class SystemTablesPredicate implements TableFilter {
Expand Down Expand Up @@ -214,14 +224,14 @@ protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {

public static Field.Set ALL_FIELDS = Field.setOf(JdbcConfiguration.HOSTNAME, USER, PASSWORD, SCHEMA, BUFFER_SIZE,
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT,
MAX_SERVER_SIDE_ENTRIES, TOPIC_NAMING_STRATEGY, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT);
MAX_SERVER_SIDE_ENTRIES, TOPIC_NAMING_STRATEGY, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE);

public static ConfigDef configDef() {
ConfigDef c = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
final ConfigDef c = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("ibmi")
.type(
JdbcConfiguration.HOSTNAME, USER, PASSWORD, SCHEMA, BUFFER_SIZE,
KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT)
KEEP_ALIVE, THREAD_USED, SOCKET_TIMEOUT, FROM_CCSID, TO_CCSID, DB_ERRORS, DATE_FORMAT, SECURE)
.connector()
.events(
As400OffsetContext.EVENT_SEQUENCE_FIELD,
Expand Down Expand Up @@ -300,7 +310,7 @@ public static SnapshotMode parse(String value) {
return null;
}
value = value.trim();
for (SnapshotMode option : SnapshotMode.values()) {
for (final SnapshotMode option : SnapshotMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
Expand Down
Loading

0 comments on commit 8732ac1

Please sign in to comment.