Skip to content

Commit

Permalink
Introduction of VNODE Repairs (#756)
Browse files Browse the repository at this point in the history
* Introduction of VNODE Repairs

* Fix Ip definition in ecc.yml

* Fix Wrong Creation of Table Storage Map

* Rebase Branch with Master
  • Loading branch information
VictorCavichioli authored Nov 6, 2024
1 parent 71f92c8 commit 18c0664
Show file tree
Hide file tree
Showing 63 changed files with 7,344 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Version 1.0.0 (Not yet Released)

* Create New Repair Type Called "VNODE" - Issue #755
* Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721
* Hot Reload of Nodes List - Issue #699
* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
Expand Down
12 changes: 12 additions & 0 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>fault.manager</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>fault.manager.impl</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;

import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.fm.impl.LoggingFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.ConfigurationException;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
import java.net.InetAddress;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -172,6 +176,12 @@ public ConfigurableServletWebServerFactory webServerFactory(final Config config)
return factory;
}

@Bean
public RepairFaultReporter repairFaultReporter() throws ConfigurationException
{
return new LoggingFaultReporter();
}

/**
* Provides a {@link DistributedNativeConnectionProvider} bean to manage Cassandra native connections.
*
Expand Down Expand Up @@ -255,6 +265,17 @@ public ReplicationState replicationState(
return new ReplicationStateImpl(nodeResolver, session);
}

@Bean
public RepairHistoryService repairHistoryService(
final DistributedNativeConnectionProvider distributedNativeConnectionProvider,
final NodeResolver nodeResolver,
final ReplicationState replicationState,
final Config config
)
{
return getRepairHistoryService(distributedNativeConnectionProvider, nodeResolver, replicationState, config);
}

private Security getSecurityConfig() throws ConfigurationException
{
return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class);
Expand Down Expand Up @@ -330,4 +351,21 @@ private EccNodesSync getEccNodesSync(
LOG.info("Nodes acquired with success");
return myEccNodesSync;
}

private RepairHistoryService getRepairHistoryService(
final DistributedNativeConnectionProvider distributedNativeConnectionProvider,
final NodeResolver nodeResolver,
final ReplicationState replicationState,
final Config config
)
{
long interval = config.getRepairConfig().getRepairHistoryLookback().getInterval(TimeUnit.MILLISECONDS);
long repairHistoryLookBack = TimeUnit.MILLISECONDS.convert(interval, TimeUnit.MILLISECONDS);
return new RepairHistoryService(
distributedNativeConnectionProvider.getCqlSession(),
replicationState,
nodeResolver,
repairHistoryLookBack
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.RepairSchedulerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.RepairStateFactoryImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TimeBasedRunPolicy;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RepairScheduler;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.core.table.ReplicatedTableProvider;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.ConfigurationException;
import java.io.Closeable;

Expand All @@ -49,7 +52,9 @@ public ECChronos(
final DistributedJmxConnectionProvider jmxConnectionProvider,
final ReplicationState replicationState,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider,
final EccNodesSync eccNodesSync
final EccNodesSync eccNodesSync,
final RepairHistoryService repairHistoryService,
final RepairFaultReporter repairFaultReporter
)
throws ConfigurationException
{
Expand All @@ -63,6 +68,13 @@ public ECChronos(
.withKeyspaceName(configuration.getRunPolicy().getTimeBasedConfig().getKeyspaceName())
.build();

RepairStateFactoryImpl repairStateFactoryImpl = RepairStateFactoryImpl.builder()
.withReplicationState(replicationState)
.withHostStates(myECChronosInternals.getHostStates())
.withRepairHistoryProvider(repairHistoryService)
.withTableRepairMetrics(myECChronosInternals.getTableRepairMetrics())
.build();

myRepairSchedulerImpl = RepairSchedulerImpl.builder()
.withJmxProxyFactory(myECChronosInternals.getJmxProxyFactory())
.withScheduleManager(myECChronosInternals.getScheduleManager())
Expand All @@ -71,6 +83,10 @@ public ECChronos(
.withReplicationState(replicationState)
.withRepairPolicies(Collections.singletonList(myTimeBasedRunPolicy))
.withCassandraMetrics(myECChronosInternals.getCassandraMetrics())
.withRepairStateFactory(repairStateFactoryImpl)
.withRepairHistory(repairHistoryService)
.withFaultReporter(repairFaultReporter)
.withTableStorageStates(myECChronosInternals.getTableStorageStates())
.build();

AbstractRepairConfigurationProvider repairConfigurationProvider = new FileBasedRepairConfiguration(applicationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.table.ReplicatedTableProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableStorageStatesImpl;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RunPolicy;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;
import com.ericsson.bss.cassandra.ecchronos.core.table.ReplicatedTableProvider;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableStorageStates;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import java.io.Closeable;
import java.util.HashMap;
Expand All @@ -51,6 +55,8 @@ public class ECChronosInternals implements Closeable
private final TableReferenceFactory myTableReferenceFactory;
private final DistributedJmxProxyFactory myJmxProxyFactory;
private final CassandraMetrics myCassandraMetrics;
private final HostStatesImpl myHostStatesImpl;
private final TableStorageStatesImpl myTableStorageStatesImpl;

public ECChronosInternals(
final Config configuration,
Expand All @@ -69,11 +75,21 @@ public ECChronosInternals(

myTableReferenceFactory = new TableReferenceFactoryImpl(session);

myHostStatesImpl = HostStatesImpl.builder()
.withJmxProxyFactory(myJmxProxyFactory)
.build();

myReplicatedTableProvider = new ReplicatedTableProviderImpl(
session,
myTableReferenceFactory,
nativeConnectionProvider.getNodes());

myTableStorageStatesImpl = TableStorageStatesImpl.builder()
.withReplicatedTableProvider(myReplicatedTableProvider)
.withJmxProxyFactory(myJmxProxyFactory)
.withConnectionProvider(nativeConnectionProvider)
.build();

myCassandraMetrics = new CassandraMetrics(myJmxProxyFactory);
myScheduleManagerImpl = ScheduleManagerImpl.builder()
.withRunInterval(configuration.getSchedulerConfig().getFrequency().getInterval(TimeUnit.MILLISECONDS),
Expand Down Expand Up @@ -112,6 +128,16 @@ public final TableRepairMetrics getTableRepairMetrics()
return NO_OP_REPAIR_METRICS;
}

public final HostStates getHostStates()
{
return myHostStatesImpl;
}

public final TableStorageStates getTableStorageStates()
{
return myTableStorageStatesImpl;
}

public final boolean addRunPolicy(final RunPolicy runPolicy)
{
return myScheduleManagerImpl.addRunPolicy(runPolicy);
Expand Down
2 changes: 1 addition & 1 deletion application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ repair:
## parallel_vnode = repair vnodes in parallel, this will combine vnodes into a single repair session per repair group
## incremental = repair vnodes incrementally (incremental repair)
##
repair_type: incremental
repair_type: vnode

run_policy:
time_based:
Expand Down
6 changes: 6 additions & 0 deletions core.impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>fault.manager</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Cassandra driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
Expand Down
Loading

0 comments on commit 18c0664

Please sign in to comment.