Skip to content

Commit

Permalink
Cassandra based distributed locking mechanism # 741
Browse files Browse the repository at this point in the history
- Cassandra tables called lock and lock_priority,
 to manage task execution and synchronization across
 multiple nodes.
  • Loading branch information
Paul Chandler authored and sajid riaz committed Nov 7, 2024
1 parent 67fbcb2 commit e735a23
Show file tree
Hide file tree
Showing 16 changed files with 185 additions and 315 deletions.
22 changes: 0 additions & 22 deletions .gitignore

This file was deleted.

3 changes: 1 addition & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

## Version 1.0.0 (Not yet Released)

* Cassandra-Based Distributed Locks #741
* Cassandra-Based Distributed Locks - Issue #741
* Create New Repair Type Called "VNODE" - Issue #755
* Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721
* Hot Reload of Nodes List - Issue #699
* Update nodes when cluster changes, nodes removed or added #699
* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
* Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -297,4 +298,18 @@ public Boolean confirmNodeValid(final Node node)
{
return myDistributedNativeConnectionProviderImpl.confirmNodeValid(node);
}

/**
* Retrieves the type of connection being used by this connection provider.
* This method delegates the call to the underlying {@code DistributedNativeConnectionProviderImpl}
* to determine the current {@link ConnectionType}.
*
* @return The {@link ConnectionType} of the connection managed by
* {@code myDistributedNativeConnectionProviderImpl}.
*/
@Override
public ConnectionType getConnectionType()
{
return myDistributedNativeConnectionProviderImpl.getConnectionType();
}
}
24 changes: 13 additions & 11 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,16 @@ rest_server:
## the cache expiration time is reached.
##
cache_expiry_time_in_seconds: 30
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on the `datacenterAware` agent type.
## If the agent type is `datacenterAware`, LOCAL_SERIAL consistency will be used. Otherwise,
## SERIAL consistency will be applied.
## "SERIAL" - Use SERIAL consistency for LWT regardless of agent type.
## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless agent type.
##
## If an agent type other than datacenterAware and LOCAL is used, all locks will be managed locally within each datacenter.
## I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public final DistributedNativeConnectionProviderImpl build()
LOG.info("Requesting Nodes List");
List<Node> nodesList = createNodesList(session);
LOG.info("Nodes list was created with success");
return new DistributedNativeConnectionProviderImpl(session, nodesList, this);
return new DistributedNativeConnectionProviderImpl(session, nodesList, this, myType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;

import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
import java.io.IOException;
import java.util.List;

Expand All @@ -27,6 +28,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
private final CqlSession mySession;
private final List<Node> myNodes;
private final DistributedNativeBuilder myDistributedNativeBuilder;
private final ConnectionType myConnectionType;

/**
* Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list
Expand All @@ -38,14 +40,15 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
* the list of {@link Node} instances representing the nodes in the cluster.
*/
public DistributedNativeConnectionProviderImpl(
final CqlSession session,
final List<Node> nodesList,
final DistributedNativeBuilder distributedNativeBuilder
)
final CqlSession session,
final List<Node> nodesList,
final DistributedNativeBuilder distributedNativeBuilder,
final ConnectionType connectionType)
{
mySession = session;
myNodes = nodesList;
myDistributedNativeBuilder = distributedNativeBuilder;
myConnectionType = connectionType;
}

/**
Expand All @@ -70,8 +73,6 @@ public List<Node> getNodes()
return myNodes;
}



/**
* Closes the {@link CqlSession} associated with this connection provider.
*
Expand Down Expand Up @@ -125,4 +126,17 @@ public Boolean confirmNodeValid(final Node node)
{
return myDistributedNativeBuilder.confirmNodeValid(node);
}

/**
* Retrieves the type of connection being used by this connection provider.
* to determine the current {@link ConnectionType}.
*
* @return The {@link ConnectionType} of the connection managed by
* {@code myDistributedNativeConnectionProviderImpl}.
*/
@Override
public ConnectionType getConnectionType()
{
return myConnectionType;
}
}
7 changes: 7 additions & 0 deletions connection/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
<name>EcChronos Connection</name>

<dependencies>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>utils</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Cassandra driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.ericsson.bss.cassandra.ecchronos.connection;

import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
Expand All @@ -35,4 +36,5 @@ default void close() throws IOException
void addNode(Node myNode);
void removeNode(Node myNode);
Boolean confirmNodeValid(Node node);
ConnectionType getConnectionType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.locks.HostStates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,7 +84,9 @@ public final class CASLockFactory implements LockFactory, Closeable

CASLockFactory(final CASLockFactoryBuilder builder)
{
myCasLockProperties = new CASLockProperties(builder.getKeyspaceName(),
myCasLockProperties = new CASLockProperties(
builder.getNativeConnectionProvider().getConnectionType(),
builder.getKeyspaceName(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()),
builder.getConsistencyType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.locks.HostStates;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;

/**
* Represents a container for builder configurations and state for the CASLockFactory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.core.impl.locks;

import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
import java.util.concurrent.ScheduledExecutorService;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
Expand All @@ -27,18 +28,21 @@
*/
public class CASLockProperties
{
private final ConnectionType myConnectionType;
private final String myKeyspaceName;
private final ScheduledExecutorService myExecutor;
private final ConsistencyLevel mySerialConsistencyLevel;
private final CqlSession mySession;
private final StatementDecorator myStatementDecorator;

CASLockProperties(final String keyspaceName,
CASLockProperties(final ConnectionType connectionType,
final String keyspaceName,
final ScheduledExecutorService executor,
final ConsistencyType consistencyType,
final CqlSession session,
final StatementDecorator statementDecorator)
{
myConnectionType = connectionType;
myKeyspaceName = keyspaceName;
myExecutor = executor;
mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType);
Expand All @@ -49,9 +53,18 @@ public class CASLockProperties
public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType)
{
ConsistencyLevel serialConsistencyLevel;
serialConsistencyLevel = ConsistencyType.LOCAL.equals(consistencyType)
if (ConsistencyType.DEFAULT.equals(consistencyType))
{
serialConsistencyLevel = myConnectionType == ConnectionType.datacenterAware
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
serialConsistencyLevel = ConsistencyType.LOCAL.equals(consistencyType)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
return serialConsistencyLevel;
}

Expand Down
Loading

0 comments on commit e735a23

Please sign in to comment.