Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented customTTL feature #295

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.7] - 2024-09-03
- Added property `spark.cdm.transform.custom.ttl` to allow a custom constant value to be set for TTL instead of using the values from `origin` rows.

## [4.3.6] - 2024-08-29
- Added `overwrite` option to conditionally check or skip `Validation` when it has a non-null value in `target` for the `spark.cdm.feature.extractJson` feature.

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class WritetimeTTL extends AbstractFeature {
private List<String> writetimeNames;
private boolean autoWritetimeNames;
private Long customWritetime = 0L;
private Long customTTL = 0L;
private List<Integer> ttlSelectColumnIndexes = null;
private List<Integer> writetimeSelectColumnIndexes = null;
private Long filterMin;
Expand Down Expand Up @@ -67,6 +68,11 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {

this.writetimeIncrement = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT);

this.customTTL = getCustomTTL(propertyHelper);
if (this.customTTL > 0) {
logger.info("PARAM -- {} is set to TTL {} ", KnownProperties.TRANSFORM_CUSTOM_TTL, customTTL);
}

this.filterMin = getMinFilter(propertyHelper);
this.filterMax = getMaxFilter(propertyHelper);
this.hasWriteTimestampFilter = (null != filterMin && null != filterMax && filterMin > 0 && filterMax > 0 && filterMax > filterMin);
Expand All @@ -81,7 +87,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
((null != ttlNames && !ttlNames.isEmpty())
|| (null != writetimeNames && !writetimeNames.isEmpty())
|| autoTTLNames || autoWritetimeNames
|| customWritetime > 0);
|| customWritetime > 0 || customTTL > 0);

isLoaded = true;
return isValid;
Expand Down Expand Up @@ -167,11 +173,16 @@ public static List<String> getWritetimeNames(IPropertyHelper propertyHelper) {
return CqlTable.unFormatNames(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES));
}

public static Long getCustomWritetime(IPropertyHelper propertyHelper) {
protected static Long getCustomWritetime(IPropertyHelper propertyHelper) {
Long cwt = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
return null==cwt ? 0L : cwt;
}

protected static Long getCustomTTL(IPropertyHelper propertyHelper) {
Long cttl = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL);
return null == cttl ? 0L : cttl;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: formatting (indentation) looks incorrect here. could we get that fixed please?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msmygit The diff here is only the the way the space are formatted. Existing ones are spaces & the new ones are tabs. I will do a separate commit after this PR is merged to use consistent formatting in the whole repo & will also commit the styling config in the repo so all can use the same styling.


public static Long getMinFilter(IPropertyHelper propertyHelper) {
return propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN);
}
Expand All @@ -181,11 +192,15 @@ public static Long getMaxFilter(IPropertyHelper propertyHelper) {
}

public Long getCustomWritetime() { return customWritetime; }
public Long getCustomTTL() { return customTTL; }
public boolean hasWriteTimestampFilter() { return isEnabled && hasWriteTimestampFilter; }
public Long getMinWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMin) ? this.filterMin : Long.MIN_VALUE; }
public Long getMaxWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMax) ? this.filterMax : Long.MAX_VALUE; }

public boolean hasTTLColumns() { return null!=this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty(); }
public boolean hasTTLColumns() {
return customTTL > 0 || null != this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty();
}

public boolean hasWritetimeColumns() { return customWritetime>0 || null!=this.writetimeSelectColumnIndexes && !this.writetimeSelectColumnIndexes.isEmpty(); }

public Long getLargestWriteTimeStamp(Row row) {
Expand All @@ -200,7 +215,8 @@ public Long getLargestWriteTimeStamp(Row row) {
}

public Integer getLargestTTL(Row row) {
if (logDebug) logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes);
if (logDebug) logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0) return this.customTTL.intValue();
if (null==this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null;
OptionalInt max = this.ttlSelectColumnIndexes.stream()
.mapToInt(row::getInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public enum PropertyType {
public static final String TRANSFORM_REPLACE_MISSING_TS = "spark.cdm.transform.missing.key.ts.replace.value";
public static final String TRANSFORM_CUSTOM_WRITETIME = "spark.cdm.transform.custom.writetime";
public static final String TRANSFORM_CUSTOM_WRITETIME_INCREMENT = "spark.cdm.transform.custom.writetime.incrementBy";
public static final String TRANSFORM_CUSTOM_TTL = "spark.cdm.transform.custom.ttl";
public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs";
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format";
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone";
Expand All @@ -170,6 +171,8 @@ public enum PropertyType {
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
types.put(TRANSFORM_CUSTOM_TTL, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_TTL, "0");
types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0");
types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST);
Expand Down
10 changes: 10 additions & 0 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ spark.cdm.connect.target.password cassandra
# all origin columns that can have TTL set (which excludes partition key,
# clustering key, collections and UDTs). When false, and .names is not set, the target
# record will have the TTL determined by the target table configuration.
#
# *** Note spark.cdm.transform.custom.ttl overrides this setting ***
#
# .names : Default is empty, meaning they will be determined automatically if that is set
# (see above). Specify a subset of eligible columns that are used to calculate
# the TTL of the target record.
Expand Down Expand Up @@ -247,6 +250,12 @@ spark.cdm.perfops.ratelimit.target 20000
# and subsequent UPSERTs would add duplicates to the list. Future versions
# of CDM may tombstone the previous list, but for now this solution is
# viable and, crucially, more performant.
# .ttl Default is 0 (no expiry). Time-to-live value in seconds to use as the
# TTL for the target record. This is useful when the TTL of the record in
# Origin cannot be determined (such as the only non-key columns are
# collections) OR is a new TTL needs to be set during migration. This
# parameter allows a crude constant value to be used in its place, and
# overrides .schema.origin.column.ttl.names
# .codecs Default is empty. A comma-separated list of additional codecs to
# enable. Current codecs are:
# INT_STRING : int stored in a String
Expand All @@ -273,6 +282,7 @@ spark.cdm.perfops.ratelimit.target 20000
#spark.cdm.transform.missing.key.ts.replace.value 1685577600000
#spark.cdm.transform.custom.writetime 0
#spark.cdm.transform.custom.writetime.incrementBy 0
#spark.cdm.transform.custom.ttl 0
#spark.cdm.transform.codecs
#spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss
#spark.cdm.transform.codecs.timestamp.string.zone UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TTLAndWritetimeTest extends CommonMocks {

WritetimeTTL feature;
Long customWritetime = 123456789L;
Long customTTL = 1000L;
Long filterMin = 100000000L;
Long filterMax = 200000000L;
String writetimeColumnName = "writetime_col";
Expand Down Expand Up @@ -69,6 +70,7 @@ private void setTestWhens(){
String argument = invocation.getArgument(0);
return originValueColumns.contains(argument);
});
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(customTTL);
}


Expand All @@ -83,6 +85,7 @@ public void smoke_loadProperties() {
assertAll(
() -> assertTrue(feature.isEnabled(), "enabled"),
() -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime"),
() -> assertEquals(customTTL, feature.getCustomTTL(), "customTTL"),
() -> assertTrue(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns with custom writetime"),
() -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime is set"),
Expand Down Expand Up @@ -113,6 +116,7 @@ public void smokeTest_disabledFeature() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);
Expand All @@ -135,6 +139,7 @@ public void smokeTest_enabledFeature_withOnlyWritetimeAuto() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.TRUE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);
Expand Down Expand Up @@ -174,6 +179,7 @@ public void smokeTest_enabledFeature_withOnlyTTLAuto() {
@Test
public void smoke_writetimeWithoutTTL() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(filterMin);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(filterMax);
Expand Down Expand Up @@ -225,6 +231,7 @@ public void smoke_ttlWithoutWritetime_withCustomWritetime() {
@Test
public void smoke_autoWritetime_noCustomWritetime() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
Expand All @@ -244,6 +251,7 @@ public void smoke_autoWritetime_noCustomWritetime() {
@Test
public void smoke_autoWritetime_CustomWritetime() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(100L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
Expand Down Expand Up @@ -373,6 +381,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() {

@Test
public void getLargestTTLTest() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100);
when(originRow.getInt(eq(100))).thenReturn(30);
when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101);
Expand Down