Skip to content

Commit

Permalink
Merge branch 'main' into major-changes-in-fabric8-discovery-implement…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
wind57 committed May 1, 2024
2 parents 7b7b8b6 + 8ee9390 commit 6a99edc
Show file tree
Hide file tree
Showing 25 changed files with 635 additions and 594 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,19 +35,19 @@ public Leader(String role, String id) {
}

public String getRole() {
return this.role;
return role;
}

public String getId() {
return this.id;
return id;
}

public boolean isCandidate(Candidate candidate) {
if (candidate == null) {
return false;
}

return Objects.equals(this.role, candidate.getRole()) && Objects.equals(this.id, candidate.getId());
return Objects.equals(role, candidate.getRole()) && Objects.equals(id, candidate.getId());
}

@Override
Expand All @@ -62,17 +62,17 @@ public boolean equals(Object o) {

Leader leader = (Leader) o;

return Objects.equals(this.role, leader.role) && Objects.equals(this.id, leader.id);
return Objects.equals(role, leader.role) && Objects.equals(id, leader.id);
}

@Override
public int hashCode() {
return Objects.hash(this.role, this.id);
return Objects.hash(role, id);
}

@Override
public String toString() {
return String.format("Leader{role='%s', id='%s'}", this.role, this.id);
return String.format("Leader{role='%s', id='%s'}", role, id);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,12 +35,17 @@ public LeaderContext(Candidate candidate, LeadershipController leadershipControl

@Override
public boolean isLeader() {
return this.leadershipController.getLocalLeader().filter(l -> l.isCandidate(this.candidate)).isPresent();
return leadershipController.getLocalLeader().filter(l -> l.isCandidate(candidate)).isPresent();
}

@Override
public void yield() {
this.leadershipController.revoke();
leadershipController.revoke();
}

@Override
public String getRole() {
return candidate.getRole();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.springframework.boot.actuate.info.Info.Builder;
import org.springframework.boot.actuate.info.InfoContributor;
Expand All @@ -38,16 +37,12 @@ public LeaderInfoContributor(LeadershipController leadershipController, Candidat
@Override
public void contribute(Builder builder) {
Map<String, Object> details = new HashMap<>();
Optional<Leader> leader = leadershipController.getLocalLeader();
if (leader.isPresent()) {
Leader l = leader.get();
details.put("leaderId", l.getId());
details.put("role", l.getRole());
details.put("isLeader", l.isCandidate(candidate));
}
else {
details.put("leaderId", "Unknown");
}
leadershipController.getLocalLeader().ifPresentOrElse(leader -> {
details.put("leaderId", leader.getId());
details.put("role", leader.getRole());
details.put("isLeader", leader.isCandidate(candidate));
}, () -> details.put("leaderId", "Unknown"));

builder.withDetail("leaderElection", details);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,17 +20,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;

/**
* @author Gytis Trikleris
*/
public class LeaderInitiator implements SmartLifecycle {

private static final Logger LOGGER = LoggerFactory.getLogger(LeaderInitiator.class);
private static final LogAccessor LOGGER = new LogAccessor(LeaderInitiator.class);

private final LeaderProperties leaderProperties;

Expand All @@ -54,33 +52,33 @@ public LeaderInitiator(LeaderProperties leaderProperties, LeadershipController l

@Override
public boolean isAutoStartup() {
return this.leaderProperties.isAutoStartup();
return leaderProperties.isAutoStartup();
}

@Override
public void start() {
if (!isRunning()) {
LOGGER.debug("Leader initiator starting");
this.leaderRecordWatcher.start();
this.hostPodWatcher.start();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
this.scheduledExecutorService.scheduleAtFixedRate(this.leadershipController::update,
this.leaderProperties.getUpdatePeriod().toMillis(),
this.leaderProperties.getUpdatePeriod().toMillis(), TimeUnit.MILLISECONDS);
this.isRunning = true;
LOGGER.debug(() -> "Leader initiator starting");
leaderRecordWatcher.start();
hostPodWatcher.start();
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(leadershipController::update,
leaderProperties.getUpdatePeriod().toMillis(), leaderProperties.getUpdatePeriod().toMillis(),
TimeUnit.MILLISECONDS);
isRunning = true;
}
}

@Override
public void stop() {
if (isRunning()) {
LOGGER.debug("Leader initiator stopping");
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null;
this.hostPodWatcher.stop();
this.leaderRecordWatcher.stop();
this.leadershipController.revoke();
this.isRunning = false;
LOGGER.debug(() -> "Leader initiator stopping");
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
hostPodWatcher.stop();
leaderRecordWatcher.stop();
leadershipController.revoke();
isRunning = false;
}
}

Expand All @@ -92,7 +90,7 @@ public void stop(Runnable callback) {

@Override
public boolean isRunning() {
return this.isRunning;
return isRunning;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.StringUtils;

/**
* @author Gytis Trikleris
Expand Down Expand Up @@ -105,7 +106,7 @@ public void setNamespace(String namespace) {
}

public String getNamespace(String defaultValue) {
if (namespace == null || namespace.isEmpty()) {
if (!StringUtils.hasText(defaultValue)) {
return defaultValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.cloud.kubernetes.commons.EnvReader;
import org.springframework.util.StringUtils;
Expand All @@ -44,4 +45,14 @@ public static String hostName() throws UnknownHostException {
}
}

public static void guarded(ReentrantLock lock, Runnable runnable) {
try {
lock.lock();
runnable.run();
}
finally {
lock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.Objects;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.core.log.LogAccessor;
import org.springframework.integration.leader.Candidate;
import org.springframework.integration.leader.Context;
import org.springframework.integration.leader.event.LeaderEventPublisher;
Expand All @@ -34,7 +32,7 @@
*/
public abstract class LeadershipController {

private static final Logger LOGGER = LoggerFactory.getLogger(LeadershipController.class);
private static final LogAccessor LOGGER = new LogAccessor(LeadershipController.class);

protected static final String PROVIDER_KEY = "provider";

Expand Down Expand Up @@ -62,15 +60,15 @@ public LeadershipController(Candidate candidate, LeaderProperties leaderProperti
}

public Optional<Leader> getLocalLeader() {
return Optional.ofNullable(this.localLeader);
return Optional.ofNullable(localLeader);
}

public abstract void update();

public abstract void revoke();

protected String getLeaderKey() {
return this.leaderProperties.getLeaderIdPrefix() + this.candidate.getRole();
return leaderProperties.getLeaderIdPrefix() + candidate.getRole();
}

protected Map<String, String> getLeaderData(Candidate candidate) {
Expand All @@ -85,72 +83,73 @@ protected Leader extractLeader(Map<String, String> data) {

String leaderKey = getLeaderKey();
String leaderId = data.get(leaderKey);
LOGGER.debug(() -> "retrieved leaderId: " + leaderId + " from leaderKey : " + leaderId);
if (!StringUtils.hasText(leaderId)) {
return null;
}

return new Leader(this.candidate.getRole(), leaderId);
return new Leader(candidate.getRole(), leaderId);
}

protected void handleLeaderChange(Leader newLeader) {
if (Objects.equals(this.localLeader, newLeader)) {
LOGGER.debug("Leader is still '{}'", this.localLeader);
if (Objects.equals(localLeader, newLeader)) {
LOGGER.debug(() -> "Leader is still : " + localLeader);
return;
}

Leader oldLeader = this.localLeader;
this.localLeader = newLeader;
Leader oldLeader = localLeader;
localLeader = newLeader;

if (oldLeader != null && oldLeader.isCandidate(this.candidate)) {
if (oldLeader != null && oldLeader.isCandidate(candidate)) {
notifyOnRevoked();
}
else if (newLeader != null && newLeader.isCandidate(this.candidate)) {
else if (newLeader != null && newLeader.isCandidate(candidate)) {
notifyOnGranted();
}

restartLeaderReadinessWatcher();

LOGGER.debug("New leader is '{}'", this.localLeader);
LOGGER.debug(() -> "New leader is " + localLeader);
}

protected void notifyOnGranted() {
LOGGER.debug("Leadership has been granted for '{}'", this.candidate);
LOGGER.debug(() -> "Leadership has been granted to : " + candidate);

Context context = new LeaderContext(this.candidate, this);
this.leaderEventPublisher.publishOnGranted(this, context, this.candidate.getRole());
Context context = new LeaderContext(candidate, this);
leaderEventPublisher.publishOnGranted(this, context, candidate.getRole());
try {
this.candidate.onGranted(context);
candidate.onGranted(context);
}
catch (InterruptedException e) {
LOGGER.warn(e.getMessage());
LOGGER.warn(e::getMessage);
Thread.currentThread().interrupt();
}
}

protected void notifyOnRevoked() {
LOGGER.debug("Leadership has been revoked for '{}'", this.candidate);
LOGGER.debug(() -> "Leadership has been revoked from :" + candidate);

Context context = new LeaderContext(this.candidate, this);
this.leaderEventPublisher.publishOnRevoked(this, context, this.candidate.getRole());
this.candidate.onRevoked(context);
Context context = new LeaderContext(candidate, this);
leaderEventPublisher.publishOnRevoked(this, context, candidate.getRole());
candidate.onRevoked(context);
}

protected void notifyOnFailedToAcquire() {
if (this.leaderProperties.isPublishFailedEvents()) {
Context context = new LeaderContext(this.candidate, this);
this.leaderEventPublisher.publishOnFailedToAcquire(this, context, this.candidate.getRole());
if (leaderProperties.isPublishFailedEvents()) {
Context context = new LeaderContext(candidate, this);
leaderEventPublisher.publishOnFailedToAcquire(this, context, candidate.getRole());
}
}

protected void restartLeaderReadinessWatcher() {
if (this.leaderReadinessWatcher != null) {
this.leaderReadinessWatcher.stop();
this.leaderReadinessWatcher = null;
if (leaderReadinessWatcher != null) {
leaderReadinessWatcher.stop();
leaderReadinessWatcher = null;
}

if (this.localLeader != null && !this.localLeader.isCandidate(this.candidate)) {
this.leaderReadinessWatcher = createPodReadinessWatcher(this.localLeader.getId());
this.leaderReadinessWatcher.start();
if (localLeader != null && !localLeader.isCandidate(candidate)) {
leaderReadinessWatcher = createPodReadinessWatcher(localLeader.getId());
leaderReadinessWatcher.start();
}
}

Expand Down
Loading

0 comments on commit 6a99edc

Please sign in to comment.