diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 94278c9f2..000000000 --- a/.gitignore +++ /dev/null @@ -1,22 +0,0 @@ -target/ -META-INF/ -OSGI-INF/ -build.properties -pom.xml.versionsBackup -dependency-reduced-pom.xml -*~ -*.bak -.checkstyle -.classpath -.project -.settings/ -.toDelete -*.pyc -.idea/ -*.iml -.coverage -*htmlcov -application/statistics/ -statistics/ -.vscode/ - diff --git a/CHANGES.md b/CHANGES.md index 411d41889..4fe0e8c7e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 1.0.0 (Not yet Released) +* Cassandra-Based Distributed Locks - Issue #741 * Create New Repair Type Called "VNODE" - Issue #755 * Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721 * Hot Reload of Nodes List - Issue #699 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java index 7cc16003a..4c1969a69 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java @@ -15,6 +15,7 @@ package com.ericsson.bss.cassandra.ecchronos.application.config; import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; +import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.LockFactoryConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig; @@ -28,6 +29,7 @@ public class Config private RunPolicyConfig myRunPolicyConfig = new RunPolicyConfig(); private SchedulerConfig mySchedulerConfig = new SchedulerConfig(); private RestServerConfig myRestServerConfig = new RestServerConfig(); + private LockFactoryConfig myLockFactoryConfig = new LockFactoryConfig(); @JsonProperty("connection") public final ConnectionConfig getConnectionConfig() @@ -119,4 +121,19 @@ public final void setRestServerConfig(final RestServerConfig restServerConfig) myRestServerConfig = restServerConfig; } } + + @JsonProperty("lock_factory") + public final LockFactoryConfig getLockFactory() + { + return myLockFactoryConfig; + } + + @JsonProperty("lock_factory") + public final void setLockFactoryConfig(final LockFactoryConfig lockFactoryConfig) + { + if (lockFactoryConfig != null) + { + myLockFactoryConfig = lockFactoryConfig; + } + } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/package-info.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/package-info.java index 6cc872e1d..9a12b1724 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/package-info.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/package-info.java @@ -16,3 +16,4 @@ * Contains configurations related to outbound connections (CQL and JMX). */ package com.ericsson.bss.cassandra.ecchronos.application.config.connection; + diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java new file mode 100644 index 000000000..59ccc3fae --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; + +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Locale; + +public class CasLockFactoryConfig +{ + private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L; + private static final String DEFAULT_KEYSPACE_NAME = "ecchronos"; + private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; + private long myExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS; + private ConsistencyType myConsistencySerial = ConsistencyType.DEFAULT; + + public final long getFailureCacheExpiryTimeInSeconds() + { + return myExpiryTimeInSeconds; + } + + @JsonProperty ("cache_expiry_time_in_seconds") + public final void setFailureCacheExpiryTimeInSeconds(final long expiryTimeInSeconds) + { + myExpiryTimeInSeconds = expiryTimeInSeconds; + } + + public final String getKeyspaceName() + { + return myKeyspaceName; + } + + @JsonProperty ("keyspace") + public final void setKeyspaceName(final String keyspaceName) + { + myKeyspaceName = keyspaceName; + } + + @JsonProperty ("consistencySerial") + public final ConsistencyType getConsistencySerial() + { + return myConsistencySerial; + } + + @JsonProperty ("consistencySerial") + public final void setConsistencySerial(final String consistencySerial) + { + myConsistencySerial = ConsistencyType.valueOf(consistencySerial.toUpperCase(Locale.US)); + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/LockFactoryConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/LockFactoryConfig.java new file mode 100644 index 000000000..d0a431cd0 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/LockFactoryConfig.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class LockFactoryConfig +{ + private CasLockFactoryConfig myCasLockFactoryConfig = new CasLockFactoryConfig(); + + @JsonProperty("cas") + public final CasLockFactoryConfig getCasLockFactoryConfig() + { + return myCasLockFactoryConfig; + } + + @JsonProperty("cas") + public final void setCasLockFactoryConfig(final CasLockFactoryConfig casLockFactoryConfig) + { + myCasLockFactoryConfig = casLockFactoryConfig; + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/package-info.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/package-info.java new file mode 100644 index 000000000..762efdadc --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains configurations related to lock factory. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java index f53fe1c89..e9194120f 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java @@ -28,6 +28,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder; import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,13 +63,13 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio * the handler for managing SSL/TLS certificates. */ public AgentNativeConnectionProvider( - final Config config, - final Supplier cqlSecuritySupplier, - final CertificateHandler certificateHandler, - final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider - ) + final Config config, + final Supplier cqlSecuritySupplier, + final CertificateHandler certificateHandler, + final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider) { - AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection() + AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig() + .getCqlConnection() .getAgentConnectionConfig(); Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get(); boolean authEnabled = cqlSecurity.getCqlCredentials().isEnabled(); @@ -94,7 +95,6 @@ public AgentNativeConnectionProvider( .withSslEngineFactory(sslEngineFactory) .withSchemaChangeListener(defaultRepairConfigurationProvider) .withNodeStateListener(defaultRepairConfigurationProvider); - LOG.info("Preparing Agent Connection Config"); nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig); LOG.info("Establishing Connection With Nodes"); @@ -112,25 +112,24 @@ public AgentNativeConnectionProvider( * @return the configured {@link DistributedNativeBuilder}. */ public final DistributedNativeBuilder resolveAgentProviderBuilder( - final DistributedNativeBuilder builder, - final AgentConnectionConfig agentConnectionConfig - ) + final DistributedNativeBuilder builder, + final AgentConnectionConfig agentConnectionConfig) { switch (agentConnectionConfig.getType()) { - case datacenterAware: - LOG.info("Using DatacenterAware as Agent Config"); - return builder.withDatacenterAware(resolveDatacenterAware( - agentConnectionConfig.getDatacenterAware())); - case rackAware: - LOG.info("Using RackAware as Agent Config"); - return builder.withRackAware(resolveRackAware( - agentConnectionConfig.getRackAware())); - case hostAware: - LOG.info("Using HostAware as Agent Config"); - return builder.withHostAware(resolveHostAware( - agentConnectionConfig.getHostAware())); - default: + case datacenterAware: + LOG.info("Using DatacenterAware as Agent Config"); + return builder.withDatacenterAware(resolveDatacenterAware( + agentConnectionConfig.getDatacenterAware())); + case rackAware: + LOG.info("Using RackAware as Agent Config"); + return builder.withRackAware(resolveRackAware( + agentConnectionConfig.getRackAware())); + case hostAware: + LOG.info("Using HostAware as Agent Config"); + return builder.withHostAware(resolveHostAware( + agentConnectionConfig.getHostAware())); + default: } return builder; } @@ -143,8 +142,7 @@ public final DistributedNativeBuilder resolveAgentProviderBuilder( * @return a list of {@link InetSocketAddress} representing the resolved contact points. */ public final List resolveInitialContactPoints( - final Map contactPoints - ) + final Map contactPoints) { List resolvedContactPoints = new ArrayList<>(); for (AgentConnectionConfig.Host host : contactPoints.values()) @@ -166,11 +164,7 @@ public final List resolveInitialContactPoints( public final List resolveDatacenterAware(final AgentConnectionConfig.DatacenterAware datacenterAware) { List datacenterNames = new ArrayList<>(); - for - ( - AgentConnectionConfig.DatacenterAware.Datacenter datacenter - : - datacenterAware.getDatacenters().values()) + for (AgentConnectionConfig.DatacenterAware.Datacenter datacenter : datacenterAware.getDatacenters().values()) { datacenterNames.add(datacenter.getName()); } @@ -187,12 +181,7 @@ public final List resolveDatacenterAware(final AgentConnectionConfig.Dat public final List> resolveRackAware(final AgentConnectionConfig.RackAware rackAware) { List> rackList = new ArrayList<>(); - for - ( - AgentConnectionConfig.RackAware.Rack rack - : - rackAware.getRacks().values() - ) + for (AgentConnectionConfig.RackAware.Rack rack : rackAware.getRacks().values()) { Map rackInfo = new HashMap<>(); rackInfo.put("datacenterName", rack.getDatacenterName()); @@ -212,12 +201,7 @@ public final List> resolveRackAware(final AgentConnectionCon public final List resolveHostAware(final AgentConnectionConfig.HostAware hostAware) { List resolvedHosts = new ArrayList<>(); - for - ( - AgentConnectionConfig.Host host - : - hostAware.getHosts().values() - ) + for (AgentConnectionConfig.Host host : hostAware.getHosts().values()) { InetSocketAddress tmpAddress = new InetSocketAddress(host.getHost(), host.getPort()); resolvedHosts.add(tmpAddress); @@ -238,8 +222,8 @@ public final List resolveHostAware(final AgentConnectionConfi * if the connection is in an illegal state. */ public final DistributedNativeConnectionProviderImpl tryEstablishConnection( - final DistributedNativeBuilder builder - ) throws AllNodesFailedException, IllegalStateException + final DistributedNativeBuilder builder) throws AllNodesFailedException, + IllegalStateException { try { @@ -285,6 +269,7 @@ public void close() throws IOException { myDistributedNativeConnectionProviderImpl.close(); } + /** * Add a nw node to the list of nodes. * @param myNode @@ -313,4 +298,18 @@ public Boolean confirmNodeValid(final Node node) { return myDistributedNativeConnectionProviderImpl.confirmNodeValid(node); } + + /** + * Retrieves the type of connection being used by this connection provider. + * This method delegates the call to the underlying {@code DistributedNativeConnectionProviderImpl} + * to determine the current {@link ConnectionType}. + * + * @return The {@link ConnectionType} of the connection managed by + * {@code myDistributedNativeConnectionProviderImpl}. + */ + @Override + public ConnectionType getConnectionType() + { + return myDistributedNativeConnectionProviderImpl.getConnectionType(); + } } diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 253f496af..efe4f39cb 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -98,6 +98,13 @@ connection: connectionDelay: time: 45 unit: MINUTES + ## + ## Allow routing requests directly to a remote datacenter. + ## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter. + ## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible. + ## If remote routing is disabled, instead SERIAL consistency will be used for those request. + ## + remoteRouting: true jmx: ## ## The class used to provide JMX connections to Apache Cassandra. @@ -165,6 +172,15 @@ repair: priority: granularity_unit: HOURS ## + ## Specifies the type of lock to use for repairs. + ## "vnode" will lock each node involved in a repair individually and increase the number of + ## parallel repairs that can run in a single data center. + ## "datacenter" will lock each data center involved in a repair and only allow a single repair per data center. + ## "datacenter_and_vnode" will combine both options and allow a smooth transition between them without allowing + ## multiple repairs to run concurrently on a single node. + ## + lock_type: vnode + ## ## Specifies the unwind ratio to smooth out the load that repairs generate. ## This value is a ratio between 0 -> 100% of the execution time of a repair session. ## @@ -261,3 +277,30 @@ rest_server: ## host: localhost port: 8080 + + lock_factory: + cas: + ## + ## The keyspace used for the CAS lock factory tables. + ## + keyspace: ecchronos + ## + ## The number of seconds until the lock failure cache expires. + ## If an attempt to secure a lock is unsuccessful, + ## all subsequent attempts will be failed until + ## the cache expiration time is reached. + ## + cache_expiry_time_in_seconds: 30 + ## + ## Allow to override consistency level for LWT (lightweight transactions). Possible values are: + ## "DEFAULT" - Use consistency level based on the `datacenterAware` agent type. + ## If the agent type is `datacenterAware`, LOCAL_SERIAL consistency will be used. Otherwise, + ## SERIAL consistency will be applied. + ## "SERIAL" - Use SERIAL consistency for LWT regardless of agent type. + ## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless agent type. + ## + ## If an agent type other than datacenterAware and LOCAL is used, all locks will be managed locally within each datacenter. + ## I.e There's a risk that multiple nodes in different datacenters will be able to lock the + ## same nodes causing multiple repairs on the same range/node at the same time. + ## + consistencySerial: "DEFAULT" diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java new file mode 100644 index 000000000..94e231560 --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; + +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCasLockFactoryConfig +{ + @Test + public void testCasLockFactoryConfigWithProvidedValue() throws IOException + { + CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("all_set.yml"); + assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecc"); + assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(100L); + } + + @Test + public void testCasLockFactoryConfigDefaultValue() throws IOException + { + CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("nothing_set.yml"); + assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecchronos"); + assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(30L); + } + + private CasLockFactoryConfig getCasLockFactoryConfig(final String fileName) throws IOException + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + Config config = mapper.readValue(file, Config.class); + return config.getLockFactory().getCasLockFactoryConfig(); + } +} diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index 218c19722..5d18c36dc 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -90,4 +90,10 @@ scheduler: rest_server: host: 127.0.0.2 - port: 8081 \ No newline at end of file + port: 8081 + +lock_factory: + cas: + keyspace: ecc + cache_expiry_time_in_seconds: 100 + consistencySerial: "LOCAL" \ No newline at end of file diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java index dbf78ef50..0316dcdcb 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java @@ -244,7 +244,7 @@ public final DistributedNativeConnectionProviderImpl build() LOG.info("Requesting Nodes List"); List nodesList = createNodesList(session); LOG.info("Nodes list was created with success"); - return new DistributedNativeConnectionProviderImpl(session, nodesList, this); + return new DistributedNativeConnectionProviderImpl(session, nodesList, this, myType); } /** diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java index 7c48a049b..d95bd49c9 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java @@ -19,6 +19,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; import java.io.IOException; import java.util.List; @@ -27,6 +28,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ private final CqlSession mySession; private final List myNodes; private final DistributedNativeBuilder myDistributedNativeBuilder; + private final ConnectionType myConnectionType; /** * Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list @@ -38,14 +40,15 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ * the list of {@link Node} instances representing the nodes in the cluster. */ public DistributedNativeConnectionProviderImpl( - final CqlSession session, - final List nodesList, - final DistributedNativeBuilder distributedNativeBuilder - ) + final CqlSession session, + final List nodesList, + final DistributedNativeBuilder distributedNativeBuilder, + final ConnectionType connectionType) { mySession = session; myNodes = nodesList; myDistributedNativeBuilder = distributedNativeBuilder; + myConnectionType = connectionType; } /** @@ -70,8 +73,6 @@ public List getNodes() return myNodes; } - - /** * Closes the {@link CqlSession} associated with this connection provider. * @@ -126,4 +127,16 @@ public Boolean confirmNodeValid(final Node node) return myDistributedNativeBuilder.confirmNodeValid(node); } + /** + * Retrieves the type of connection being used by this connection provider. + * to determine the current {@link ConnectionType}. + * + * @return The {@link ConnectionType} of the connection managed by + * {@code myDistributedNativeConnectionProviderImpl}. + */ + @Override + public ConnectionType getConnectionType() + { + return myConnectionType; + } } diff --git a/connection/pom.xml b/connection/pom.xml index 5ce8de648..1cad2e4b7 100644 --- a/connection/pom.xml +++ b/connection/pom.xml @@ -32,6 +32,13 @@ EcChronos Connection + + + com.ericsson.bss.cassandra.ecchronos + utils + ${project.version} + + com.datastax.oss diff --git a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java index 6dde865e6..2ec7c5988 100644 --- a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java +++ b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java @@ -14,6 +14,7 @@ */ package com.ericsson.bss.cassandra.ecchronos.connection; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -33,7 +34,7 @@ default void close() throws IOException { } void addNode(Node myNode); - void removeNode(Node myNode); Boolean confirmNodeValid(Node node); + ConnectionType getConnectionType(); } diff --git a/core.impl/pom.xml b/core.impl/pom.xml index 9351dfebe..3004eecd4 100644 --- a/core.impl/pom.xml +++ b/core.impl/pom.xml @@ -105,6 +105,28 @@ test + + io.micrometer + micrometer-core + + + + com.google.guava + guava + + + + net.jcip + jcip-annotations + test + + + + org.testcontainers + cassandra + test + + org.assertj assertj-core diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLock.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLock.java new file mode 100644 index 000000000..34e610fe3 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLock.java @@ -0,0 +1,216 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; + +/** + * Represents a container for builder configurations and state for the CASLock. + * This class is used to decouple builder fields from CASLockFactory to avoid excessive field count. + */ +class CASLock implements DistributedLock, Runnable +{ + private static final Logger LOG = LoggerFactory.getLogger(CASLock.class); + + private final String myDataCenter; + private final String myResource; + private final int myPriority; + private final Map myMetadata; + + private final AtomicReference> myUpdateFuture = new AtomicReference<>(); + + private final AtomicInteger myFailedUpdateAttempts = new AtomicInteger(); + + private final int myLocallyHighestPriority; + private final int globalHighPriority; + + private final UUID myUuid; + + private final CASLockStatement myCasLockStatement; + + CASLock(final String dataCenter, + final String resource, + final int priority, + final Map metadata, + final UUID uuid, + final CASLockStatement casLockStatement) + { + myDataCenter = dataCenter; + myResource = resource; + myPriority = priority; + myMetadata = metadata; + myUuid = uuid; + myCasLockStatement = casLockStatement; + + List nodePriorities = computePriorities(); + + myLocallyHighestPriority = nodePriorities.stream() + .filter(n -> n.getUuid().equals(myUuid)) + .map(NodePriority::getPriority) + .findFirst() + .orElse(myPriority); + globalHighPriority = nodePriorities.stream() + .filter(n -> !n.getUuid().equals(myUuid)) + .map(NodePriority::getPriority) + .max(Integer::compare) + .orElse(myPriority); + } + + public boolean lock() + { + if (compete()) + { + LOG.trace("Trying to acquire lock for resource {}", myResource); + if (tryLock()) + { + ScheduledExecutorService executor = myCasLockStatement.getCasLockProperties().getExecutor(); + LOG.trace("Lock for resource {} acquired", myResource); + ScheduledFuture future = executor.scheduleAtFixedRate(this, + myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(), + myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(), TimeUnit.SECONDS); + myUpdateFuture.set(future); + + return true; + } + } + + return false; + } + + @Override + public void run() + { + try + { + updateLock(); + myFailedUpdateAttempts.set(0); + } + catch (LockException e) + { + int failedAttempts = myFailedUpdateAttempts.incrementAndGet(); + + if (failedAttempts >= myCasLockStatement.getCasLockFactoryCacheContext().getFailedLockRetryAttempts()) + { + LOG.error("Unable to re-lock resource '{}' after {} failed attempts", myResource, failedAttempts); + } + else + { + LOG.warn("Unable to re-lock resource '{}', {} failed attempts", myResource, failedAttempts, e); + } + } + } + + @Override + public void close() + { + ScheduledFuture future = myUpdateFuture.get(); + if (future != null) + { + future.cancel(true); + myCasLockStatement.execute( + myDataCenter, + myCasLockStatement.getRemoveLockStatement().bind(myResource, myUuid)); + + if (myLocallyHighestPriority <= myPriority) + { + myCasLockStatement.execute( + myDataCenter, + myCasLockStatement.getRemoveLockPriorityStatement().bind(myResource, myUuid)); + } + else + { + LOG.debug("Locally highest priority ({}) is higher than current ({}), will not remove", + myLocallyHighestPriority, + myPriority); + } + } + } + + private void updateLock() throws LockException + { + ResultSet resultSet = myCasLockStatement.execute(myDataCenter, + myCasLockStatement.getUpdateLockStatement().bind(myUuid, myMetadata, myResource, myUuid)); + + if (!resultSet.wasApplied()) + { + throw new LockException("CAS query failed"); + } + } + + private boolean compete() + { + if (myLocallyHighestPriority <= myPriority) + { + insertPriority(); + } + + LOG.trace("Highest priority for resource {}: {}", myResource, globalHighPriority); + return myPriority >= globalHighPriority; + } + + private void insertPriority() + { + myCasLockStatement.execute( + myDataCenter, + myCasLockStatement.getCompeteStatement().bind(myResource, myUuid, myPriority)); + } + + private boolean tryLock() + { + return myCasLockStatement.execute( + myDataCenter, + myCasLockStatement.getLockStatement().bind(myResource, myUuid, myMetadata)).wasApplied(); + } + + private List computePriorities() + { + List nodePriorities = new ArrayList<>(); + + ResultSet resultSet = myCasLockStatement.execute( + myDataCenter, + myCasLockStatement.getGetPriorityStatement().bind(myResource)); + + for (Row row : resultSet) + { + int priority = row.getInt(CASLockStatement.COLUMN_PRIORITY); + UUID hostId = row.getUuid(CASLockStatement.COLUMN_NODE); + + nodePriorities.add(new NodePriority(hostId, priority)); + } + + return nodePriorities; + } + + int getFailedAttempts() + { + return myFailedUpdateAttempts.get(); + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java new file mode 100644 index 000000000..4ad9727cd --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java @@ -0,0 +1,347 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TokenMap; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Lock factory using Cassandras LWT (Compare-And-Set operations) to create and maintain locks. + * + * Expected keyspace/tables: + *
+ * CREATE KEYSPACE IF NOT EXISTS ecchronos WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 1};
+ *
+ * CREATE TABLE IF NOT EXISTS ecchronos.lock (
+ * resource text,
+ * node uuid,
+ * metadata map<text,text>,
+ * PRIMARY KEY(resource))
+ * WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
+ *
+ * CREATE TABLE IF NOT EXISTS ecchronos.lock_priority(
+ * resource text,
+ * node uuid,
+ * priority int,
+ * PRIMARY KEY(resource, node))
+ * WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
+ * 
+ */ +public final class CASLockFactory implements LockFactory, Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class); + + private static final String TABLE_LOCK = "lock"; + private static final String TABLE_LOCK_PRIORITY = "lock_priority"; + private static final int REFRESH_INTERVAL_RATIO = 10; + private static final int DEFAULT_LOCK_TIME_IN_SECONDS = 600; + + private final UUID myUuid; + private final HostStates myHostStates; + private final CASLockFactoryCacheContext myCasLockFactoryCacheContext; + + private final CASLockProperties myCasLockProperties; + private final CASLockStatement myCasLockStatement; + + CASLockFactory(final CASLockFactoryBuilder builder) + { + myCasLockProperties = new CASLockProperties( + builder.getNativeConnectionProvider().getConnectionType(), + builder.getKeyspaceName(), + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()), + builder.getConsistencyType(), + builder.getNativeConnectionProvider().getCqlSession(), + builder.getStatementDecorator()); + + myHostStates = builder.getHostStates(); + + verifySchemasExists(); + + UUID hostId = builder.getNode().getHostId(); + + if (hostId == null) + { + hostId = UUID.randomUUID(); + LOG.warn("Unable to determine local nodes host id, using {} instead", hostId); + } + + myUuid = hostId; + myCasLockFactoryCacheContext = buildCasLockFactoryCacheContext(builder.getCacheExpiryTimeInSecond()); + + myCasLockStatement = new CASLockStatement(myCasLockProperties, myCasLockFactoryCacheContext); + } + + private CASLockFactoryCacheContext buildCasLockFactoryCacheContext(final long cacheExpiryTimeInSeconds) + { + int lockTimeInSeconds = getDefaultTimeToLiveFromLockTable(); + int lockUpdateTimeInSeconds = lockTimeInSeconds / REFRESH_INTERVAL_RATIO; + int myFailedLockRetryAttempts = (lockTimeInSeconds / lockUpdateTimeInSeconds) - 1; + + return CASLockFactoryCacheContext.newBuilder() + .withLockUpdateTimeInSeconds(lockUpdateTimeInSeconds) + .withFailedLockRetryAttempts(myFailedLockRetryAttempts) + .withLockCache(new LockCache(this::doTryLock, cacheExpiryTimeInSeconds)) + .build(); + } + + private int getDefaultTimeToLiveFromLockTable() + { + TableMetadata tableMetadata = myCasLockProperties.getSession() + .getMetadata() + .getKeyspace(myCasLockProperties.getKeyspaceName()) + .flatMap(ks -> ks.getTable(TABLE_LOCK)) + .orElse(null); + if (tableMetadata == null || tableMetadata.getOptions() == null) + { + LOG.warn("Could not parse default ttl of {}.{}", myCasLockProperties.getKeyspaceName(), TABLE_LOCK); + return DEFAULT_LOCK_TIME_IN_SECONDS; + } + Map tableOptions = tableMetadata.getOptions(); + return (Integer) tableOptions.get(CqlIdentifier.fromInternal("default_time_to_live")); + } + + @Override + public DistributedLock tryLock(final String dataCenter, + final String resource, + final int priority, + final Map metadata) + throws LockException + { + return myCasLockFactoryCacheContext.getLockCache() + .getLock(dataCenter, resource, priority, metadata); + } + + @Override + public Map getLockMetadata(final String dataCenter, final String resource) throws LockException + { + ResultSet resultSet = myCasLockStatement.execute( + dataCenter, myCasLockStatement.getLockMetadataStatement().bind(resource)); + + Row row = resultSet.one(); + + if (row != null) + { + return row.getMap("metadata", String.class, String.class); + } + else + { + throw new LockException("Unable to retrieve metadata for resource " + resource); + } + } + + @Override + public boolean sufficientNodesForLocking(final String dataCenter, final String resource) + { + try + { + Set nodes = getNodesForResource(dataCenter, resource); + + int quorum = nodes.size() / 2 + 1; + int liveNodes = liveNodes(nodes); + + LOG.trace("Live nodes {}, quorum: {}", liveNodes, quorum); + + return liveNodes >= quorum; + } + catch (UnsupportedEncodingException e) + { + LOG.warn("Unable to encode resource bytes", e); + } + + return false; + } + + @Override + public Optional getCachedFailure(final String dataCenter, final String resource) + { + return myCasLockFactoryCacheContext.getLockCache().getCachedFailure(dataCenter, resource); + } + + @Override + public void close() + { + myCasLockProperties.getExecutor().shutdown(); + try + { + if (!myCasLockProperties.getExecutor().awaitTermination(1, TimeUnit.SECONDS)) + { + LOG.warn("Executing tasks did not finish within one second"); + } + } + catch (InterruptedException e) + { + LOG.warn("Interrupted while waiting for executor to shut down", e); + } + } + + @VisibleForTesting + UUID getHostId() + { + return myUuid; + } + + @VisibleForTesting + CASLockFactoryCacheContext getCasLockFactoryCacheContext() + { + return myCasLockFactoryCacheContext; + } + + @VisibleForTesting + CASLockStatement getCasLockStatement() + { + return myCasLockStatement; + } + + @VisibleForTesting + ConsistencyLevel getSerialConsistencyLevel() + { + return myCasLockProperties.getSerialConsistencyLevel(); + } + + public static CASLockFactoryBuilder builder() + { + return new CASLockFactoryBuilder(); + } + + private DistributedLock doTryLock(final String dataCenter, + final String resource, + final int priority, + final Map metadata) throws LockException + { + LOG.trace("Trying lock for {} - {}", dataCenter, resource); + + if (!sufficientNodesForLocking(dataCenter, resource)) + { + LOG.warn("Not sufficient nodes to lock resource {} in datacenter {}", resource, dataCenter); + throw new LockException("Not sufficient nodes to lock"); + } + CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, myUuid, myCasLockStatement); // NOSONAR + if (casLock.lock()) + { + return casLock; + } + else + { + throw new LockException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter)); + } + } + + private Set getNodesForResource(final String dataCenter, + final String resource) throws UnsupportedEncodingException + { + Set dataCenterNodes = new HashSet<>(); + + Metadata metadata = myCasLockProperties.getSession().getMetadata(); + TokenMap tokenMap = metadata.getTokenMap() + .orElseThrow(() -> new IllegalStateException("Couldn't get token map, is it disabled?")); + Set nodes = tokenMap.getReplicas( + myCasLockProperties.getKeyspaceName(), ByteBuffer.wrap(resource.getBytes("UTF-8"))); + + if (dataCenter != null) + { + Iterator iterator = nodes.iterator(); + + while (iterator.hasNext()) + { + Node node = iterator.next(); + + if (dataCenter.equals(node.getDatacenter())) + { + dataCenterNodes.add(node); + } + } + + return dataCenterNodes; + } + + return nodes; + } + + private int liveNodes(final Collection nodes) + { + int live = 0; + for (Node node : nodes) + { + if (myHostStates.isUp(node)) + { + live++; + } + } + return live; + } + + private void verifySchemasExists() + { + Optional keyspaceMetadata = myCasLockProperties + .getSession() + .getMetadata() + .getKeyspace(myCasLockProperties.getKeyspaceName()); + + if (!keyspaceMetadata.isPresent()) + { + String msg = String.format("Keyspace %s does not exist, it needs to be created", + myCasLockProperties.getKeyspaceName()); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + if (!keyspaceMetadata.get().getTable(TABLE_LOCK).isPresent()) + { + String msg = String.format("Table %s.%s does not exist, it needs to be created", + myCasLockProperties.getKeyspaceName(), + TABLE_LOCK); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + if (!keyspaceMetadata.get().getTable(TABLE_LOCK_PRIORITY).isPresent()) + { + String msg = String.format("Table %s.%s does not exist, it needs to be created", + myCasLockProperties.getKeyspaceName(), + TABLE_LOCK_PRIORITY); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java new file mode 100644 index 000000000..0e60d4960 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; + +/** + * Represents a container for builder configurations and state for the CASLockFactory. + * This class is used to decouple builder fields from CASLockFactory to avoid excessive field count. + */ +public class CASLockFactoryBuilder +{ + private static final String DEFAULT_KEYSPACE_NAME = "ecchronos"; + private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L; + private static final ConsistencyType DEFAULT_CONSISTENCY_SERIAL = ConsistencyType.DEFAULT; + + private DistributedNativeConnectionProvider myNativeConnectionProvider; + private HostStates myHostStates; + private StatementDecorator myStatementDecorator; + private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; + private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS; + private ConsistencyType myConsistencyType = DEFAULT_CONSISTENCY_SERIAL; + private Node myNode; + + public final CASLockFactoryBuilder withNativeConnectionProvider(final DistributedNativeConnectionProvider nativeConnectionProvider) + { + myNativeConnectionProvider = nativeConnectionProvider; + return this; + } + + public final CASLockFactoryBuilder withHostStates(final HostStates hostStates) + { + myHostStates = hostStates; + return this; + } + + public final CASLockFactoryBuilder withStatementDecorator(final StatementDecorator statementDecorator) + { + myStatementDecorator = statementDecorator; + return this; + } + + public final CASLockFactoryBuilder withKeyspaceName(final String keyspaceName) + { + myKeyspaceName = keyspaceName; + return this; + } + + public final CASLockFactoryBuilder withCacheExpiryInSeconds(final long cacheExpiryInSeconds) + { + myCacheExpiryTimeInSeconds = cacheExpiryInSeconds; + return this; + } + + public final CASLockFactoryBuilder withConsistencySerial(final ConsistencyType consistencyType) + { + myConsistencyType = consistencyType; + return this; + } + + public final CASLockFactoryBuilder withNode(final Node node) + { + myNode = node; + return this; + } + + public final CASLockFactory build() + { + if (myNativeConnectionProvider == null) + { + throw new IllegalArgumentException("Native connection provider cannot be null"); + } + + if (myHostStates == null) + { + throw new IllegalArgumentException("Host states cannot be null"); + } + + if (myStatementDecorator == null) + { + throw new IllegalArgumentException("Statement decorator cannot be null"); + } + + return new CASLockFactory(this); + } + + public final DistributedNativeConnectionProvider getNativeConnectionProvider() + { + return myNativeConnectionProvider; + } + + public final HostStates getHostStates() + { + return myHostStates; + } + + public final StatementDecorator getStatementDecorator() + { + return myStatementDecorator; + } + + public final String getKeyspaceName() + { + return myKeyspaceName; + } + + public final long getCacheExpiryTimeInSecond() + { + return myCacheExpiryTimeInSeconds; + } + + public final ConsistencyType getConsistencyType() + { + return myConsistencyType; + } + + public final Node getNode() + { + return myNode; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryCacheContext.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryCacheContext.java new file mode 100644 index 000000000..43269b3ef --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryCacheContext.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +/** + * Represents a container for cache-related configurations and state for the CASLockFactory. + * This class is used to decouple cache-related fields from CASLockFactory to avoid excessive field count. + */ +public final class CASLockFactoryCacheContext +{ + private final LockCache myLockCache; + private final long myLockUpdateTimeInSeconds; + private final int myFailedLockRetryAttempts; + + public CASLockFactoryCacheContext(final Builder builder) + { + myLockCache = builder.myLockCache; + myLockUpdateTimeInSeconds = builder.myLockUpdateTimeInSeconds; + myFailedLockRetryAttempts = builder.myFailedLockRetryAttempts; + } + + public LockCache getLockCache() + { + return myLockCache; + } + + public long getLockUpdateTimeInSeconds() + { + return myLockUpdateTimeInSeconds; + } + + public int getFailedLockRetryAttempts() + { + return myFailedLockRetryAttempts; + } + + public static Builder newBuilder() + { + return new Builder(); + } + + public static class Builder + { + private LockCache myLockCache; + private int myLockUpdateTimeInSeconds; + private int myFailedLockRetryAttempts; + + public final Builder withLockUpdateTimeInSeconds(final int lockTimeInSeconds) + { + myLockUpdateTimeInSeconds = lockTimeInSeconds; + return this; + } + + public final Builder withFailedLockRetryAttempts(final int failedLockRetryAttempts) + { + myFailedLockRetryAttempts = failedLockRetryAttempts; + return this; + } + + public final Builder withLockCache(final LockCache lockCache) + { + myLockCache = lockCache; + return this; + } + + public final CASLockFactoryCacheContext build() + { + return new CASLockFactoryCacheContext(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java new file mode 100644 index 000000000..add371c13 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; +import java.util.concurrent.ScheduledExecutorService; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; + +/** + * Represents a container for builder configurations and state for the CASLockStatement. + * This class is used to decouple builder fields from CASLock to avoid excessive field count. + */ +public class CASLockProperties +{ + private final ConnectionType myConnectionType; + private final String myKeyspaceName; + private final ScheduledExecutorService myExecutor; + private final ConsistencyLevel mySerialConsistencyLevel; + private final CqlSession mySession; + private final StatementDecorator myStatementDecorator; + + CASLockProperties(final ConnectionType connectionType, + final String keyspaceName, + final ScheduledExecutorService executor, + final ConsistencyType consistencyType, + final CqlSession session, + final StatementDecorator statementDecorator) + { + myConnectionType = connectionType; + myKeyspaceName = keyspaceName; + myExecutor = executor; + mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType); + mySession = session; + myStatementDecorator = statementDecorator; + } + + public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType) + { + ConsistencyLevel serialConsistencyLevel; + if (ConsistencyType.DEFAULT.equals(consistencyType)) + { + serialConsistencyLevel = myConnectionType == ConnectionType.datacenterAware + ? ConsistencyLevel.LOCAL_SERIAL + : ConsistencyLevel.SERIAL; + } + else + { + serialConsistencyLevel = ConsistencyType.LOCAL.equals(consistencyType) + ? ConsistencyLevel.LOCAL_SERIAL + : ConsistencyLevel.SERIAL; + } + return serialConsistencyLevel; + } + + public final String getKeyspaceName() + { + return myKeyspaceName; + } + + public final ScheduledExecutorService getExecutor() + { + return myExecutor; + } + + public final ConsistencyLevel getSerialConsistencyLevel() + { + return mySerialConsistencyLevel; + } + + public final CqlSession getSession() + { + return mySession; + } + + public final StatementDecorator getStatementDecorator() + { + return myStatementDecorator; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java new file mode 100644 index 000000000..b4bbd52b4 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java @@ -0,0 +1,224 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwareStatement; + +/** + * Represents a container for builder configurations and state for the CASLockStatement. + * This class is used to decouple builder fields from CASLock to avoid excessive field count. + */ +public class CASLockStatement +{ + static final String COLUMN_RESOURCE = "resource"; + static final String COLUMN_NODE = "node"; + static final String COLUMN_METADATA = "metadata"; + static final String COLUMN_PRIORITY = "priority"; + + private static final String TABLE_LOCK = "lock"; + private static final String TABLE_LOCK_PRIORITY = "lock_priority"; + + private final PreparedStatement myCompeteStatement; + private final PreparedStatement myLockStatement; + private final PreparedStatement myRemoveLockStatement; + private final PreparedStatement myUpdateLockStatement; + private final PreparedStatement myRemoveLockPriorityStatement; + private final PreparedStatement myGetPriorityStatement; + private final PreparedStatement myGetLockMetadataStatement; + + private final CASLockProperties myCasLockProperties; + private final CASLockFactoryCacheContext myCasLockFactoryCacheContext; + + public CASLockStatement( + final CASLockProperties casLockProperties, + final CASLockFactoryCacheContext casLockFactoryCacheContext) + { + myCasLockProperties = casLockProperties; + myCasLockFactoryCacheContext = casLockFactoryCacheContext; + myCompeteStatement = myCasLockProperties.getSession().prepare(competeStatement()); + myLockStatement = myCasLockProperties.getSession().prepare((insertLockStatement())); + myRemoveLockStatement = myCasLockProperties.getSession().prepare(removeLockStatement()); + myUpdateLockStatement = myCasLockProperties.getSession().prepare((updateLockStatement())); + myRemoveLockPriorityStatement = myCasLockProperties.getSession().prepare(removeLockPriorityStatement()); + myGetPriorityStatement = myCasLockProperties.getSession().prepare(getPriorityStatement()); + myGetLockMetadataStatement = myCasLockProperties.getSession().prepare(lockMetadataStatement()); + } + + public final ResultSet execute(final String dataCenter, final BoundStatement statement) + { + Statement executeStatement; + + if (dataCenter != null) + { + executeStatement = new DataCenterAwareStatement(statement, dataCenter); + } + else + { + executeStatement = statement; + } + + return myCasLockProperties.getSession() + .execute(myCasLockProperties + .getStatementDecorator() + .apply(executeStatement)); + } + + private SimpleStatement insertLockStatement() + { + SimpleStatement insertLockStatement = QueryBuilder + .insertInto(myCasLockProperties.getKeyspaceName(), TABLE_LOCK) + .value(COLUMN_RESOURCE, bindMarker()) + .value(COLUMN_NODE, bindMarker()) + .value(COLUMN_METADATA, bindMarker()) + .ifNotExists() + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel()); + return insertLockStatement; + } + + private SimpleStatement removeLockStatement() + { + SimpleStatement removeLockStatement = QueryBuilder + .deleteFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK) + .whereColumn(COLUMN_RESOURCE) + .isEqualTo(bindMarker()) + .ifColumn(COLUMN_NODE) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel()); + return removeLockStatement; + } + + private SimpleStatement updateLockStatement() + { + SimpleStatement updateLockStatement = QueryBuilder + .update(myCasLockProperties.getKeyspaceName(), TABLE_LOCK) + .setColumn(COLUMN_NODE, bindMarker()) + .setColumn(COLUMN_METADATA, bindMarker()) + .whereColumn(COLUMN_RESOURCE) + .isEqualTo(bindMarker()) + .ifColumn(COLUMN_NODE) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel()); + return updateLockStatement; + } + + private SimpleStatement competeStatement() + { + SimpleStatement competeStatement = QueryBuilder + .insertInto(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY) + .value(COLUMN_RESOURCE, bindMarker()) + .value(COLUMN_NODE, bindMarker()) + .value(COLUMN_PRIORITY, bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + return competeStatement; + } + + private SimpleStatement getPriorityStatement() + { + SimpleStatement priorityStatement = QueryBuilder + .selectFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY) + .columns(COLUMN_PRIORITY, COLUMN_NODE) + .whereColumn(COLUMN_RESOURCE) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + return priorityStatement; + } + + private SimpleStatement removeLockPriorityStatement() + { + SimpleStatement removeLockPriorityStatement = QueryBuilder + .deleteFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY) + .whereColumn(COLUMN_RESOURCE) + .isEqualTo(bindMarker()) + .whereColumn(COLUMN_NODE) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + return removeLockPriorityStatement; + } + + private SimpleStatement lockMetadataStatement() + { + SimpleStatement lockMetadataStatement = QueryBuilder + .selectFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK) + .column(COLUMN_METADATA) + .whereColumn(COLUMN_RESOURCE) + .isEqualTo(bindMarker()) + .build() + .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel()); + return lockMetadataStatement; + } + + public final PreparedStatement getCompeteStatement() + { + return myCompeteStatement; + } + + public final PreparedStatement getLockStatement() + { + return myLockStatement; + } + + public final PreparedStatement getRemoveLockStatement() + { + return myRemoveLockStatement; + } + + public final PreparedStatement getUpdateLockStatement() + { + return myUpdateLockStatement; + } + + public final PreparedStatement getRemoveLockPriorityStatement() + { + return myRemoveLockPriorityStatement; + } + + public final PreparedStatement getGetPriorityStatement() + { + return myGetPriorityStatement; + } + + public final PreparedStatement getLockMetadataStatement() + { + return myGetLockMetadataStatement; + } + + public final CASLockFactoryCacheContext getCasLockFactoryCacheContext() + { + return myCasLockFactoryCacheContext; + } + + public final CASLockProperties getCasLockProperties() + { + return myCasLockProperties; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java new file mode 100644 index 000000000..7f8f76a61 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java @@ -0,0 +1,135 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; + +public final class LockCache +{ + private static final Logger LOG = LoggerFactory.getLogger(LockCache.class); + + private final Cache myFailureCache; + private final LockSupplier myLockSupplier; + + public LockCache(final LockSupplier lockSupplier, final long expireTimeInSeconds) + { + this(lockSupplier, expireTimeInSeconds, TimeUnit.SECONDS); + } + + LockCache(final LockSupplier lockSupplier, final long expireTime, final TimeUnit expireTimeUnit) + { + myLockSupplier = lockSupplier; + + myFailureCache = Caffeine.newBuilder() + .expireAfterWrite(expireTime, expireTimeUnit) + .executor(Runnable::run) + .build(); + } + + public Optional getCachedFailure(final String dataCenter, final String resource) + { + return getCachedFailure(new LockKey(dataCenter, resource)); + } + + public DistributedLock getLock(final String dataCenter, + final String resource, + final int priority, + final Map metadata) + throws LockException + { + LockKey lockKey = new LockKey(dataCenter, resource); + + Optional cachedFailure = getCachedFailure(lockKey); + + if (cachedFailure.isPresent()) + { + throwCachedLockException(cachedFailure.get()); + } + + try + { + return myLockSupplier.getLock(dataCenter, resource, priority, metadata); + } + catch (LockException e) + { + myFailureCache.put(lockKey, e); + throw e; + } + } + + private void throwCachedLockException(final LockException e) throws LockException + { + LOG.debug("Encountered cached locking failure, throwing exception", e); + throw e; + } + + private Optional getCachedFailure(final LockKey lockKey) + { + return Optional.ofNullable(myFailureCache.getIfPresent(lockKey)); + } + + @FunctionalInterface + public interface LockSupplier + { + DistributedLock getLock(String dataCenter, String resource, int priority, Map metadata) + throws LockException; + } + + static final class LockKey + { + private final String myDataCenter; + private final String myResourceName; + + LockKey(final String dataCenter, final String resourceName) + { + myDataCenter = dataCenter; + myResourceName = checkNotNull(resourceName); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + LockKey lockKey = (LockKey) o; + return Objects.equals(myDataCenter, lockKey.myDataCenter) + && Objects.equals(myResourceName, lockKey.myResourceName); + } + + @Override + public int hashCode() + { + return Objects.hash(myDataCenter, myResourceName); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCollection.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCollection.java new file mode 100644 index 000000000..74d524c05 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCollection.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A lock implementation covering multiple distributed locks. + *

+ * Closes all underlying locks when closed. + */ +public class LockCollection implements LockFactory.DistributedLock +{ + private static final Logger LOG = LoggerFactory.getLogger(LockCollection.class); + + private final List myLocks; + + public LockCollection(final Collection locks) + { + myLocks = new ArrayList<>(locks); + } + + /** + * Close. + */ + @Override + public void close() + { + for (LockFactory.DistributedLock lock : myLocks) + { + try + { + lock.close(); + } + catch (Exception e) + { + LOG.warn("Unable to release lock {}", lock, e); + } + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/NodePriority.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/NodePriority.java new file mode 100644 index 000000000..e40a3d5af --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/NodePriority.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import java.util.UUID; + +/** + * Represents a container for node priority configurations and state for the CASLockFactory. + * This class is used to decouple node priority fields from CASLockFactory to avoid excessive field count. + */ +public final class NodePriority +{ + private final UUID myNode; + private final int myPriority; + + public NodePriority(final UUID node, final int priority) + { + myNode = node; + myPriority = priority; + } + + public UUID getUuid() + { + return myNode; + } + + public int getPriority() + { + return myPriority; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/package-info.java new file mode 100644 index 000000000..d86503737 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementations related to locks. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/ConsistencyType.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/ConsistencyType.java new file mode 100644 index 000000000..9865fb0d5 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/ConsistencyType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.utils; + +public enum ConsistencyType +{ + DEFAULT, + LOCAL, + SERIAL +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/package-info.java new file mode 100644 index 000000000..7f4869772 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/utils/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains utilities classes. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.utils; + diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java new file mode 100644 index 000000000..9fb8c68b8 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; +import java.net.InetSocketAddress; + +import java.util.List; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.utility.DockerImageName; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; + +public class AbstractCassandraContainerTest +{ + protected static CqlSession mySession; + + private static DistributedNativeConnectionProvider myNativeConnectionProvider; + private static CassandraContainer node; + + @SuppressWarnings ("resource") + @BeforeClass + public static void setUpCluster() + { + // This is set as an environment variable ('it.cassandra.version') in maven using the '-D' flag. + String cassandraVersion = System.getProperty("it.cassandra.version"); + if (cassandraVersion == null) + { + // No environment version set, just use latest. + cassandraVersion = "latest"; + } + node = new CassandraContainer<>(DockerImageName.parse("cassandra:" + cassandraVersion)) + .withExposedPorts(9042, 7000, 7199) + .withEnv("CASSANDRA_DC", "DC1") + .withEnv("CASSANDRA_ENDPOINT_SNITCH", "GossipingPropertyFileSnitch") + .withEnv("CASSANDRA_CLUSTER_NAME", "TestCluster") + .withEnv("JMX_PORT", "7199"); + node.start(); + String containerIpAddress = node.getHost(); + Integer containerPort = node.getMappedPort(9042); + + mySession = CqlSession.builder() + .addContactPoint(new InetSocketAddress(containerIpAddress, containerPort)) + .withLocalDatacenter("DC1") + .build(); + + List nodesList = mySession.getMetadata().getNodes().values().stream().toList(); + myNativeConnectionProvider = new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return mySession; + } + + @Override + public List getNodes() + { + return nodesList; + } + + @Override + public void addNode(Node myNode) + { + } + + @Override + public void removeNode(Node myNode) + { + } + + @Override + public Boolean confirmNodeValid(Node node) + { + return false; + } + + @Override + public ConnectionType getConnectionType() + { + return ConnectionType.hostAware; + } + }; + } + + @AfterClass + public static void tearDownCluster() + { + if (mySession != null) + { + mySession.close(); + } + node.stop(); + } + + public static DistributedNativeConnectionProvider getNativeConnectionProvider() + { + return myNativeConnectionProvider; + } + + public static CassandraContainer getContainerNode() + { + return node; + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java new file mode 100644 index 000000000..ba6069adb --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java @@ -0,0 +1,706 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.AbstractCassandraContainerTest; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; +import com.datastax.oss.driver.api.core.metrics.Metrics; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import net.jcip.annotations.NotThreadSafe; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@NotThreadSafe +@RunWith (Parameterized.class) +public class TestCASLockFactory extends AbstractCassandraContainerTest +{ + @Parameterized.Parameters + public static Collection keyspaceNames() + { + return Arrays.asList("ecchronos", "anotherkeyspace"); + } + + private static final String TABLE_LOCK = "lock"; + private static final String TABLE_LOCK_PRIORITY = "lock_priority"; + + private static final String DATA_CENTER = "DC1"; + private static CASLockFactory myLockFactory; + private static PreparedStatement myLockStatement; + private static PreparedStatement myRemoveLockStatement; + private static PreparedStatement myCompeteStatement; + private static PreparedStatement myGetPrioritiesStatement; + + private static HostStates hostStates; + + @Parameterized.Parameter + public String myKeyspaceName; + + @Before + public void startup() + { + mySession.execute(String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", myKeyspaceName)); + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.lock (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName)); + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.lock_priority (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName)); + + hostStates = mock(HostStates.class); + when(hostStates.isUp(any(Node.class))).thenReturn(true); + Node node = mock(Node.class); + when(node.getHostId()).thenReturn(UUID.randomUUID()); + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(node) + .build(); + + myLockStatement = mySession.prepare(QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK) + .value("resource", bindMarker()) + .value("node", bindMarker()) + .value("metadata", bindMarker()) + .ifNotExists() + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)); + myRemoveLockStatement = + mySession.prepare(String.format("DELETE FROM %s.%s WHERE resource=? IF EXISTS", myKeyspaceName, TABLE_LOCK)); + myCompeteStatement = mySession.prepare( + String.format("INSERT INTO %s.%s (resource, node, priority) VALUES (?, ?, ?)", myKeyspaceName, TABLE_LOCK_PRIORITY)); + myGetPrioritiesStatement = + mySession.prepare(String.format("SELECT * FROM %s.%s WHERE resource=?", myKeyspaceName, TABLE_LOCK_PRIORITY)); + } + + @After + public void testCleanup() + { + execute(SimpleStatement.newInstance( + String.format("DELETE FROM %s.%s WHERE resource='%s'", myKeyspaceName, TABLE_LOCK_PRIORITY, "lock"))); + execute(myRemoveLockStatement.bind("lock")); + myLockFactory.close(); + } + + @Test + public void testGetDefaultTimeToLiveFromLockTable() throws LockException + { + String alterLockTable = String.format("ALTER TABLE %s.%s WITH default_time_to_live = 1200;", myKeyspaceName, TABLE_LOCK); + mySession.execute(alterLockTable); + Node node = mock(Node.class); + when(node.getHostId()).thenReturn(UUID.randomUUID()); + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(node) + .build(); + assertThat(myLockFactory.getCasLockFactoryCacheContext().getFailedLockRetryAttempts()).isEqualTo(9); + assertThat(myLockFactory.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds()).isEqualTo(120); + } + + @Test + public void testGetLock() throws LockException + { + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap())) + { + } + + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testGetGlobalLock() throws LockException + { + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(null, "lock", 1, new HashMap())) + { + } + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isEmpty(); + } + + @Test + public void testGlobalLockTakenThrowsException() + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isNotEmpty(); + } + + @Test + public void testGlobalLockTakenIsCachedOnSecondTry() throws AttributeNotFoundException, + InstanceNotFoundException, + MalformedObjectNameException, + MBeanException, + ReflectionException, + UnsupportedOperationException, + IOException, + InterruptedException + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + InternalDriverContext driverContext = (InternalDriverContext) mySession.getContext(); + //Check that no in-flight queries exist, we want all previous queries to complete before we proceed + Optional connectedNode = driverContext.getPoolManager().getPools().keySet().stream().findFirst(); + while (getInFlightQueries(connectedNode.get()) != 0) + { + Thread.sleep(100); + } + long expectedLockReadCount = getReadCount(TABLE_LOCK) + 2; // We do a read due to CAS and execCommandOnContainer + long expectedLockWriteCount = getWriteCount(TABLE_LOCK) + 1; // No writes as the lock is already held + long expectedLockPriorityReadCount = getReadCount(TABLE_LOCK_PRIORITY) + 2; // We read the priorities + long expectedLockPriorityWriteCount = getWriteCount(TABLE_LOCK_PRIORITY) + 1; // We update our local priority once + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 2, new HashMap<>())); + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 1, new HashMap<>())); + + assertThat(getReadCount(TABLE_LOCK_PRIORITY)).isEqualTo(expectedLockPriorityReadCount); + assertThat(getWriteCount(TABLE_LOCK_PRIORITY)).isEqualTo(expectedLockPriorityWriteCount); + + assertThat(getReadCount(TABLE_LOCK)).isEqualTo(expectedLockReadCount); + assertThat(getWriteCount(TABLE_LOCK)).isEqualTo(expectedLockWriteCount); + assertPrioritiesInList("lock", 2); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isNotEmpty(); + } + + private int getInFlightQueries(Node node) + { + int inFlightQueries = 0; + Optional metrics = mySession.getMetrics(); + if (metrics.isPresent()) + { + Optional inFlight = metrics.get().getNodeMetric(node, DefaultNodeMetric.IN_FLIGHT); + if (inFlight.isPresent()) + { + inFlightQueries = (int) ((Gauge) inFlight.get()).getValue(); + } + } + return inFlightQueries; + } + + @Test + public void testGetLockWithLowerPriority() + { + execute(myCompeteStatement.bind("lock", UUID.randomUUID(), 2)); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1, 2); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isNotEmpty(); + } + + @Test + public void testGetAlreadyTakenLock() + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isNotEmpty(); + } + + @Test + public void testGetLockWithLocallyHigherPriority() throws LockException + { + UUID localHostId = getNativeConnectionProvider().getNodes().get(0).getHostId(); + execute(myCompeteStatement.bind("lock", localHostId, 2)); + CASLockFactory lockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(getNativeConnectionProvider().getNodes().get(0)) + .build(); + + try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())) + { + } + + assertPrioritiesInList("lock", 2); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testGetLockWithLocallyLowerPriority() throws LockException + { + UUID localHostId = getNativeConnectionProvider().getNodes().get(0).getHostId(); + execute(myCompeteStatement.bind("lock", localHostId, 1)); + CASLockFactory lockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(getNativeConnectionProvider().getNodes().get(0)) + .build(); + try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 2, new HashMap<>())) + { + } + + assertPriorityListEmpty("lock"); + assertThat(lockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testReadMetadata() throws LockException + { + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("data", "something"); + + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(DATA_CENTER, "lock", 1, expectedMetadata)) + { + Map actualMetadata = myLockFactory.getLockMetadata(DATA_CENTER, "lock"); + + assertThat(actualMetadata).isEqualTo(expectedMetadata); + } + + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testInterruptCasLockUpdate() throws InterruptedException + { + Map metadata = new HashMap<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + + try + { + Future future = executorService.submit( + new CASLock( + DATA_CENTER, + "lock", + 1, + metadata, + myLockFactory.getHostId(), + myLockFactory.getCasLockStatement())); + + Thread.sleep(100); + + future.cancel(true); + + executorService.shutdown(); + assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); + } + finally + { + if (!executorService.isShutdown()) + { + executorService.shutdownNow(); + } + } + } + + @Test + public void testFailedLockRetryAttempts() + { + Map metadata = new HashMap<>(); + try (CASLock lockUpdateTask = new CASLock( + DATA_CENTER, + "lock", + 1, + metadata, + myLockFactory.getHostId(), + myLockFactory.getCasLockStatement())) + { + for (int i = 0; i < 10; i++) + { + lockUpdateTask.run(); + assertThat(lockUpdateTask.getFailedAttempts()).isEqualTo(i + 1); + } + + execute(myLockStatement.bind("lock", myLockFactory.getHostId(), new HashMap<>())); + lockUpdateTask.run(); + assertThat(lockUpdateTask.getFailedAttempts()).isEqualTo(0); + } + + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testActivateWithoutKeyspaceCausesIllegalStateException() + { + mySession.execute(String.format("DROP KEYSPACE %s", myKeyspaceName)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", myKeyspaceName)); + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName, TABLE_LOCK)); + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName, TABLE_LOCK_PRIORITY)); + } + + @Test + public void testActivateWithoutLockTableCausesIllegalStateException() + { + mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName, TABLE_LOCK)); + } + + @Test + public void testActivateWithoutLockPriorityTableCausesIllegalStateException() + { + mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK_PRIORITY)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + myKeyspaceName, TABLE_LOCK_PRIORITY)); + } + + @Test + public void testActivateWithoutCassandraCausesIllegalStateException() + { + // mock + CqlSession session = mock(CqlSession.class); + + doThrow(AllNodesFailedException.class).when(session).getMetadata(); + + // test + assertThatExceptionOfType(AllNodesFailedException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return session; + } + + @Override + public List getNodes() + { + return null; + } + + @Override + public void addNode(Node myNode) + { + } + + @Override + public void removeNode(Node myNode) + { + } + + @Override + public Boolean confirmNodeValid(Node node) + { + return false; + } + + @Override + public ConnectionType getConnectionType() + { + return null; + } + }) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + } + + @Test + public void testDataCenterAwareAgentTypeWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + when(connectionProviderMock.getConnectionType()).thenReturn(ConnectionType.datacenterAware); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getDataCenterAwareConnectionTypeProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testOtherThanDataCenterAwareAgentTypeWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testLocalSerialConsistency() + { + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.LOCAL) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testSerialConsistency() + { + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.SERIAL) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + private void assertPriorityListEmpty(String resource) + { + assertThat(getPriorities(resource)).isEmpty(); + } + + private void assertPrioritiesInList(String resource, Integer... priorities) + { + assertThat(getPriorities(resource)).containsExactlyInAnyOrder(priorities); + } + + private Set getPriorities(String resource) + { + ResultSet resultSet = execute(myGetPrioritiesStatement.bind(resource)); + List rows = resultSet.all(); + + return rows.stream().map(r -> r.getInt("priority")).collect(Collectors.toSet()); + } + + private ResultSet execute(Statement statement) + { + return mySession.execute(statement); + } + + private long getReadCount(String tableName) throws AttributeNotFoundException, + InstanceNotFoundException, + MBeanException, + ReflectionException, + IOException, + MalformedObjectNameException, + UnsupportedOperationException, + InterruptedException + { + return getReadCountFromTableStats(tableName); + } + + private long getWriteCount(String tableName) throws AttributeNotFoundException, + InstanceNotFoundException, + MBeanException, + ReflectionException, + IOException, + MalformedObjectNameException, + UnsupportedOperationException, + InterruptedException + { + return getWriteCountFromTableStats(tableName); + } + + private long getReadCountFromTableStats(String tableName) throws UnsupportedOperationException, IOException, InterruptedException + { + String tableStatsOutput = + getContainerNode().execInContainer("nodetool", "tablestats", myKeyspaceName + "." + tableName).getStdout(); + long readCount = 0; + Pattern readCountPattern = Pattern.compile("Read Count:\\s+(\\d+)"); + Matcher readCountMatcher = readCountPattern.matcher(tableStatsOutput); + + if (readCountMatcher.find()) + { + readCount = Long.parseLong(readCountMatcher.group(1)); + } + + return readCount; + } + + private long getWriteCountFromTableStats(String tableName) throws UnsupportedOperationException, IOException, InterruptedException + { + String tableStatsOutput = + getContainerNode().execInContainer("nodetool", "tablestats", myKeyspaceName + "." + tableName).getStdout(); + long writeCount = 0; + Pattern writeCountPattern = Pattern.compile("Write Count:\\s+(\\d+)"); + Matcher writeCountMatcher = writeCountPattern.matcher(tableStatsOutput); + + if (writeCountMatcher.find()) + { + writeCount = Long.parseLong(writeCountMatcher.group(1)); + } + + return writeCount; + } + + private DistributedNativeConnectionProvider getDataCenterAwareConnectionTypeProvider() + { + return new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return mySession; + } + + @Override + public List getNodes() + { + return mySession.getMetadata().getNodes().values().stream().toList(); + } + + @Override + public void addNode(Node myNode) + { + } + + @Override + public void removeNode(Node myNode) + { + } + + @Override + public Boolean confirmNodeValid(Node node) + { + return false; + } + + @Override + public ConnectionType getConnectionType() + { + return ConnectionType.datacenterAware; + } + }; + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java new file mode 100644 index 000000000..2f88964a5 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java @@ -0,0 +1,165 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestLockCache +{ + private static final String DATA_CENTER = "DC1"; + private static final String RESOURCE = "RepairResource-91e32362-7af4-11e9-8f9e-2a86e4085a59-1"; + private static final int PRIORITY = 1; + private static final Map METADATA = new HashMap<>(); + + @Mock + private LockCache.LockSupplier mockedLockSupplier; + + private LockCache myLockCache; + + @Before + public void setup() + { + myLockCache = new LockCache(mockedLockSupplier, 30L); + } + + @Test + public void testGetLock() throws LockException + { + DistributedLock expectedLock = doReturnLockOnGetLock(); + + assertGetLockRetrievesExpectedLock(expectedLock); + } + + @Test + public void testGetThrowingLockIsCached() throws LockException + { + LockException expectedExcetion = doThrowOnGetLock(); + + assertGetLockThrowsException(expectedExcetion); + + // Reset return type, locking should still throw + doReturnLockOnGetLock(); + + assertGetLockThrowsException(expectedExcetion); + } + + @Test + public void testGetMultipleLocks() throws LockException + { + String otherResource = "RepairResource-b2e33e60-7af6-11e9-8f9e-2a86e4085a59-1"; + + DistributedLock expectedLock = doReturnLockOnGetLock(RESOURCE); + DistributedLock expectedOtherLock = doReturnLockOnGetLock(otherResource); + + assertGetLockRetrievesExpectedLock(RESOURCE, expectedLock); + assertGetLockRetrievesExpectedLock(otherResource, expectedOtherLock); + } + + @Test + public void testGetOtherLockAfterThrowingOnAnotherResource() throws LockException + { + String otherResource = "RepairResource-b2e33e60-7af6-11e9-8f9e-2a86e4085a59-1"; + + LockException expectedException = doThrowOnGetLock(RESOURCE); + DistributedLock expectedOtherLock = doReturnLockOnGetLock(otherResource); + + assertGetLockThrowsException(RESOURCE, expectedException); + assertGetLockRetrievesExpectedLock(otherResource, expectedOtherLock); + } + + @Test + public void testGetLockAfterCachedExceptionHasExpired() throws LockException, InterruptedException + { + myLockCache = new LockCache(mockedLockSupplier, 20, TimeUnit.MILLISECONDS); + + LockException expectedException = doThrowOnGetLock(); + assertGetLockThrowsException(expectedException); + + Thread.sleep(20); + + DistributedLock expectedLock = doReturnLockOnGetLock(); + assertGetLockRetrievesExpectedLock(expectedLock); + } + + @Test + public void testEqualsContract() + { + EqualsVerifier.forClass(LockCache.LockKey.class).usingGetClass().verify(); + } + + private void assertGetLockRetrievesExpectedLock(DistributedLock expectedLock) throws LockException + { + assertGetLockRetrievesExpectedLock(RESOURCE, expectedLock); + } + + private void assertGetLockRetrievesExpectedLock(String resource, DistributedLock expectedLock) throws LockException + { + assertThat(myLockCache.getLock(DATA_CENTER, resource, PRIORITY, METADATA)).isSameAs(expectedLock); + assertThat(myLockCache.getCachedFailure(DATA_CENTER, resource)).isEmpty(); + } + + private void assertGetLockThrowsException(LockException expectedException) + { + assertGetLockThrowsException(RESOURCE, expectedException); + } + + private void assertGetLockThrowsException(String resource, LockException expectedException) + { + assertThatThrownBy(() -> myLockCache.getLock(DATA_CENTER, resource, PRIORITY, METADATA)).isSameAs(expectedException); + assertThat(myLockCache.getCachedFailure(DATA_CENTER, resource)).isNotEmpty(); + } + + private DistributedLock doReturnLockOnGetLock() throws LockException + { + return doReturnLockOnGetLock(RESOURCE); + } + + private DistributedLock doReturnLockOnGetLock(String resource) throws LockException + { + DistributedLock expectedLock = mock(DistributedLock.class); + when(mockedLockSupplier.getLock(eq(DATA_CENTER), eq(resource), eq(PRIORITY), eq(METADATA))).thenReturn(expectedLock); + return expectedLock; + } + + private LockException doThrowOnGetLock() throws LockException + { + return doThrowOnGetLock(RESOURCE); + } + + private LockException doThrowOnGetLock(String resource) throws LockException + { + LockException expectedException = new LockException(""); + when(mockedLockSupplier.getLock(eq(DATA_CENTER), eq(resource), eq(PRIORITY), eq(METADATA))).thenThrow(expectedException); + return expectedException; + } +} \ No newline at end of file diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java new file mode 100644 index 000000000..160096b44 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static org.assertj.core.api.Assertions.assertThat; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; + +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; + +public class TestLockCollection +{ + + @Test + public void testCloseAllLocks() + { + List locks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + locks.add(new DummyLock()); + } + + new LockCollection(locks).close(); + + for (DummyLock lock : locks) + { + assertThat(lock.closed).isTrue(); + } + } + + @Test + public void testCloseAllLocksOneThrowing() + { + List locks = new ArrayList<>(); + for (int i = 0; i < 4; i++) + { + locks.add(new DummyLock()); + } + + locks.add(new ThrowingLock()); + + for (int i = 0; i < 5; i++) + { + locks.add(new DummyLock()); + } + + new LockCollection(locks).close(); + + for (DistributedLock lock : locks) + { + if (lock instanceof DummyLock) + { + assertThat(((DummyLock) lock).closed).isTrue(); + } + } + } + + private class ThrowingLock implements DistributedLock + { + @Override + public void close() + { + throw new IllegalStateException(); + } + } +} + +class DummyLock implements LockFactory.DistributedLock +{ + public volatile boolean closed = false; + + @Override + public void close() + { + closed = true; + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java new file mode 100644 index 000000000..39afa9ac8 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.locks; + +import java.io.Closeable; +import java.util.Map; +import java.util.Optional; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; + +/** + * Interface for distributed lock factories. + */ +public interface LockFactory +{ + + /** + * Try to lock a distributed resource using the provided priority. + * + * @param dataCenter + * The data center the lock belongs to or null if it's a global lock. + * @param resource + * The resource to lock. + * @param priority + * The priority of the lock. + * @param metadata + * The metadata of the lock. + * @return The lock if able to lock the resource. + */ + DistributedLock tryLock(String dataCenter, String resource, int priority, Map metadata) + throws LockException; + + /** + * Get the metadata of a resource lock. + * + * @param dataCenter + * The data center the lock belongs to or null if it's a global lock. + * @param resource + * The data center resource: + * i.e "RepairResource-DC1-1". + * @return The metadata of the lock + * containing keyspace and table to repair. + * @throws LockException + */ + Map getLockMetadata(String dataCenter, String resource) throws LockException; + + /** + * Checks if local_quorum is met. + * + * @param dataCenter + * The data center the lock belongs to or null if it's a global lock. + * @param resource + * The data center resource. + * i.e "RepairResource-DC1-1". + * @return boolean + * Indicates if local_quorum is met. + */ + boolean sufficientNodesForLocking(String dataCenter, String resource); + + /** + * Utility method to return a cached lock exception if one is available. + * + * @param dataCenter The data center the lock is for or null if it's a global lock. + * @param resource The resource the lock is for. + * @return A cached exception if available. + */ + default Optional getCachedFailure(String dataCenter, String resource) + { + return Optional.empty(); + } + + /** + * A locked resource that gets released by the call of the {@link DistributedLock#close() close()} method. + */ + interface DistributedLock extends Closeable + { + /** + * Releases the locked resource. + */ + @Override + void close(); + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/package-info.java new file mode 100644 index 000000000..8a0252b9f --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains interfaces related to locks. + */ +package com.ericsson.bss.cassandra.ecchronos.core.locks; diff --git a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java index ce2dd65f4..2c0cd6bfe 100644 --- a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java +++ b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java @@ -17,6 +17,7 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.metadata.Node; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; import org.junit.AfterClass; import org.junit.BeforeClass; import org.testcontainers.containers.CassandraContainer; @@ -75,6 +76,12 @@ public void removeNode(Node myNode) { public Boolean confirmNodeValid(Node node) { return false; } + + @Override + public ConnectionType getConnectionType() + { + return ConnectionType.datacenterAware; + } }; } diff --git a/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockException.java b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockException.java new file mode 100644 index 000000000..e968341b3 --- /dev/null +++ b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockException.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.utils.exceptions; + +/** + * Exception thrown when a lock factory is unable to get a lock. + */ +public class LockException extends Exception +{ + private static final long serialVersionUID = 1699712279389641954L; + + public LockException(final String message) + { + super(message); + } + + public LockException(final String message, final Throwable t) + { + super(message, t); + } + + public LockException(final Throwable t) + { + super(t); + } +}