diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/AtomicUtils.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/AtomicUtils.java new file mode 100644 index 000000000..7475d4ff0 --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/AtomicUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Utility class for performing atomic operations on {@link AtomicReference}. + */ +public class AtomicUtils { + + /** + * Atomically updates the value of the given {@link AtomicReference} if the current value satisfies the specified predicate. + * If the predicate is satisfied, the new value is set using {@link AtomicReference#compareAndSet(Object, Object)}. + * If the update is successful and a success consumer is provided, the consumer is called with the old and new values. + * If the update is successful, the new value is returned. Otherwise, the old value is returned. + * + * @param reference the {@link AtomicReference} to update + * @param value the new value to set + * @param predicate a predicate that tests the current value; if null, the update is always attempted + * @param success a consumer that is called with the old and new values if the update is successful; if null, no consumer is called + * @return the new value if the update was successful, otherwise the old value + */ + public static V update(AtomicReference reference, V value, Predicate predicate, BiConsumer success) { + V old; + while (true) { + old = reference.get(); + if (predicate == null || predicate.test(old)) { + if (reference.compareAndSet(old, value)) { + if (success != null) { + success.accept(old, value); + } + return value; + } + } else { + return old; + } + } + } + + /** + * Atomically gets or updates the value associated with the specified key in the given map. + * If the key does not exist in the map, a new {@link AtomicReference} is created and associated with the key. + * The method checks if the current value satisfies the specified predicate. + * If the predicate is satisfied, the new value is set using {@link AtomicReference#compareAndSet(Object, Object)}. + * If the update is successful and a success consumer is provided, the consumer is called with the old and new values. + * If the update is successful, the new value is returned. Otherwise, the old value is returned. + * + * @param map the map containing {@link AtomicReference} values + * @param key the key whose value is to be retrieved or updated + * @param supplier a supplier that provides the new value if the predicate is satisfied or the value is null + * @param predicate a predicate that tests the current value; if null, the update is always attempted + * @param success a consumer that is called with the old and new values if the update is successful; if null, no consumer is called + * @return the new value if the update was successful, otherwise the old value + */ + public static V getOrUpdate(Map> map, K key, Supplier supplier, Predicate predicate, BiConsumer success) { + AtomicReference reference = map.computeIfAbsent(key, k -> new AtomicReference<>()); + V old = reference.get(); + if (predicate == null && old != null || predicate != null && predicate.test(old)) { + return old; + } + return update(reference, supplier.get(), predicate == null ? null : predicate.negate(), success); + } +} diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindow.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindow.java new file mode 100644 index 000000000..4cd05221a --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindow.java @@ -0,0 +1,53 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import lombok.Getter; + +/** + * Represents a time window defined by a start time and a duration. + * The end time is automatically calculated based on the start time and duration. + */ +@Getter +public class TimeWindow { + + /** + * The start time of the time window. + */ + protected final long startTime; + + /** + * The end time of the time window, calculated as startTime + duration. + */ + protected final long endTime; + + /** + * The duration of the time window. + */ + protected final long duration; + + /** + * Constructs a new TimeWindow with the specified start time and duration. + * + * @param startTime The start time of the time window. + * @param duration The duration of the time window. + */ + public TimeWindow(long startTime, long duration) { + this.startTime = startTime; + this.endTime = startTime + duration; + this.duration = duration; + } +} diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindowList.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindowList.java new file mode 100644 index 000000000..796fa528d --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWindowList.java @@ -0,0 +1,84 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import java.util.ArrayList; +import java.util.List; + +/** + * A class that manages a list of TimeWindow objects. + */ +public class TimeWindowList { + + /** + * The first TimeWindow added to the list. + */ + private TimeWindow window; + + /** + * A list to store multiple TimeWindow objects. + */ + private List windows; + + /** + * Constructs a new TimeWindowList. + */ + public TimeWindowList() { + } + + /** + * Adds a TimeWindow to the list. + * + * @param window The TimeWindow to add. + */ + public void add(TimeWindow window) { + if (window != null) { + if (this.window == null) { + this.window = window; + } else if (windows == null) { + windows = new ArrayList<>(); + windows.add(this.window); + windows.add(window); + } else { + windows.add(window); + } + } + } + + /** + * Returns the TimeWindow with the maximum end time from the list. + * If there are multiple TimeWindows, it returns a new TimeWindow + * with the maximum start time and maximum end time from the list. + * + * @return The TimeWindow with the maximum start and end time. + */ + public TimeWindow max() { + if (windows == null) { + return window; + } + long maxStartTime = Long.MIN_VALUE; + long maxEndTime = Long.MIN_VALUE; + for (TimeWindow window : windows) { + if (window.startTime > maxStartTime) { + maxStartTime = window.startTime; + } + if (window.endTime > maxEndTime) { + maxEndTime = window.endTime; + } + } + return new TimeWindow(maxStartTime, (int) (maxEndTime - maxStartTime)); + } +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/CircuitBreakerConfig.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/CircuitBreakerConfig.java index 1f22dde02..5b49b32b5 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/CircuitBreakerConfig.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/CircuitBreakerConfig.java @@ -23,24 +23,12 @@ */ @Getter @Setter -public class CircuitBreakerConfig { +public class CircuitBreakerConfig extends RecyclerConfig { /** * The type of the circuit breaker. Default is "Resilience4j". */ private String type = "Resilience4j"; - /** - * The interval in milliseconds at which the circuit breaker should clean up expired entries. - * Default is 30,000 milliseconds (30 seconds). - */ - private long cleanInterval = 30000; - - /** - * The time in milliseconds after which the circuit breaker entries expire. - * Default is 60,000 milliseconds (60 seconds). - */ - private long expireTime = 60000; - } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/ConcurrencyLimiterConfig.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/ConcurrencyLimiterConfig.java index 7b78492f1..422110ff6 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/ConcurrencyLimiterConfig.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/ConcurrencyLimiterConfig.java @@ -23,24 +23,12 @@ */ @Getter @Setter -public class ConcurrencyLimiterConfig { +public class ConcurrencyLimiterConfig extends RecyclerConfig { /** * The type of the concurrency limiter. Default is "Resilience4j". */ private String type = "Resilience4j"; - /** - * The interval in milliseconds at which the concurrency limiter should clean up expired entries. - * Default is 30,000 milliseconds (30 seconds). - */ - private long cleanInterval = 30000; - - /** - * The time in milliseconds after which the concurrency limiter entries expire. - * Default is 60,000 milliseconds (60 seconds). - */ - private long expireTime = 60000; - } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RateLimiterConfig.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RateLimiterConfig.java index b9e480b3c..7c81ba8df 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RateLimiterConfig.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RateLimiterConfig.java @@ -23,24 +23,12 @@ */ @Getter @Setter -public class RateLimiterConfig { +public class RateLimiterConfig extends RecyclerConfig { /** * The type of the rate limiter. */ private String type; - /** - * The interval in milliseconds at which the rate limiter should clean up expired entries. - * Default is 30,000 milliseconds (30 seconds). - */ - private long cleanInterval = 30000; - - /** - * The time in milliseconds after which the rate limiter entries expire. - * Default is 60,000 milliseconds (60 seconds). - */ - private long expireTime = 60000; - } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RecyclerConfig.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RecyclerConfig.java new file mode 100644 index 000000000..a3522b4dd --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/config/RecyclerConfig.java @@ -0,0 +1,35 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.config; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public abstract class RecyclerConfig { + /** + * The interval in milliseconds at which the concurrency limiter should clean up expired entries. + * Default is 30,000 milliseconds (30 seconds). + */ + protected long cleanInterval = 30000; + + /** + * The time in milliseconds after which the concurrency limiter entries expire. + * Default is 60,000 milliseconds (60 seconds). + */ + protected long expireTime = 60000; +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/AbstractEndpoint.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/AbstractEndpoint.java index 64418c11a..fd9a1e6d2 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/AbstractEndpoint.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/AbstractEndpoint.java @@ -39,6 +39,10 @@ public abstract class AbstractEndpoint extends AbstractAttributes implements End // Endpoint is request level private Integer weight; + private Long recoverTime; + + private Integer recoverDuration; + @Override public String getLiveSpaceId() { if (liveSpaceId == null) { @@ -95,9 +99,29 @@ public String getLane() { } @Override - public Integer getWeight(ServiceRequest request) { + public Long getRecoverTime() { + return recoverTime; + } + + @Override + public void setRecoverTime(Long recoverTime) { + this.recoverTime = recoverTime; + } + + @Override + public Integer getRecoverDuration() { + return recoverDuration; + } + + @Override + public void setRecoverDuration(Integer duration) { + this.recoverDuration = duration; + } + + @Override + public Integer reweight(ServiceRequest request) { if (weight == null) { - weight = Endpoint.super.getWeight(request); + weight = Endpoint.super.reweight(request); } return weight; } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/Endpoint.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/Endpoint.java index 1078e2394..95a7bafe5 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/Endpoint.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/instance/Endpoint.java @@ -111,29 +111,80 @@ default Integer getWarmup() { } /** - * Gets the weight for the specified service request, taking into account the origin weight and warm-up time. + * Returns the recover time. + * + * @return The recover time, or null if not set. + */ + default Long getRecoverTime() { + return null; + } + + /** + * Sets the recover time. + * + * @param recoverTime The recover time to set. + */ + default void setRecoverTime(Long recoverTime) { + } + + /** + * Returns the recover duration. + * + * @return The recover duration, or null if not set. + */ + default Integer getRecoverDuration() { + return null; + } + + /** + * Sets the recover duration. + * + * @param duration The recover duration to set. + */ + default void setRecoverDuration(Integer duration) { + } + + /** + * Gets the weight for the specified service request, taking into account the weight, warm-up time and recover time. * * @param request the service request for which to get the weight - * @return the weight for this endpoint, taking into account the origin weight and warm-up time + * @return the weight for this endpoint */ - default Integer getWeight(ServiceRequest request) { - int weight = getOriginWeight(request); + default Integer reweight(ServiceRequest request) { + int weight = getWeight(request); if (weight > 0) { - Long timestamp = getTimestamp(); - if (timestamp != null && timestamp > 0L) { - long uptime = System.currentTimeMillis() - timestamp; - if (uptime < 0) { - weight = 1; - } else { - int warmup = getWarmup(); - if (uptime > 0 && uptime < warmup) { - int ww = (int) (uptime / ((float) warmup / weight)); - weight = ww < 1 ? 1 : Math.min(ww, weight); - } - } - } + long now = System.currentTimeMillis(); + weight = getWeight(weight, getTimestamp(), getWarmup(), now); + weight = getWeight(weight, getRecoverTime(), getRecoverDuration(), now); + return weight < 0 ? 0 : Math.max(1, weight); } - return Math.max(weight, 0); + return 0; + } + + /** + * Calculates the effective weight of a resource based on its uptime and warmup period. + *

+ * This method calculates the effective weight of a resource considering its initial weight, + * the timestamp of its start, a warmup period, and the current time. The weight is adjusted + * during the warmup period and returns the original weight once the warmup period is complete. + * + * @param weight the initial weight of the resource + * @param timestamp the timestamp when the resource started + * @param duration the time window in milliseconds + * @param now the current timestamp + * @return the effective weight of the resource + */ + static int getWeight(int weight, Long timestamp, Integer duration, long now) { + if (weight <= 0 || timestamp == null || timestamp <= 0 || duration == null || duration <= 0) { + return weight; + } + long span = now - timestamp; + if (span <= 0) { + return -1; + } else if (span < duration) { + return (int) (span / ((float) duration / weight)); + } + return weight; } /** @@ -142,7 +193,7 @@ default Integer getWeight(ServiceRequest request) { * @param request the service request for which to get the origin weight * @return the origin weight, or the default value if not specified */ - default Integer getOriginWeight(ServiceRequest request) { + default Integer getWeight(ServiceRequest request) { return Converts.getInteger(getLabel(Constants.LABEL_WEIGHT), DEFAULT_WEIGHT); } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreaker.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreaker.java index de213f6a8..2def7c69d 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreaker.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreaker.java @@ -19,19 +19,25 @@ import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; import lombok.Getter; +import java.util.concurrent.atomic.AtomicBoolean; + /** * AbstractCircuitBreaker * * @since 1.1.0 */ -@Getter public abstract class AbstractCircuitBreaker implements CircuitBreaker { - private final CircuitBreakPolicy policy; + @Getter + protected final CircuitBreakPolicy policy; + + @Getter + protected final URI uri; - private final URI uri; + @Getter + protected long lastAccessTime; - private long lastAcquireTime; + protected final AtomicBoolean started = new AtomicBoolean(true); public AbstractCircuitBreaker(CircuitBreakPolicy policy, URI uri) { this.policy = policy; @@ -40,10 +46,49 @@ public AbstractCircuitBreaker(CircuitBreakPolicy policy, URI uri) { @Override public boolean acquire() { - lastAcquireTime = System.currentTimeMillis(); + if (!started.get()) { + return true; + } + lastAccessTime = System.currentTimeMillis(); return doAcquire(); } + @Override + public void release() { + if (started.get()) { + doRelease(); + } + } + + @Override + public void onError(long durationInMs, Throwable throwable) { + if (started.get()) { + doOnError(durationInMs, throwable); + } + } + + @Override + public void onSuccess(long durationInMs) { + if (started.get()) { + doOnSuccess(durationInMs); + } + } + + @Override + public void close() { + // When the circuit breaker is not accessed for a long time, it will be automatically garbage collected. + if (started.compareAndSet(true, false)) { + doClose(); + } + } + + /** + * Closes the circuit breaker. + */ + protected void doClose() { + + } + /** * Performs the actual acquisition logic. * Subclasses must implement this method to define the specific acquisition behavior. @@ -52,4 +97,24 @@ public boolean acquire() { */ protected abstract boolean doAcquire(); + /** + * Releases the acquired permit. + */ + protected abstract void doRelease(); + + /** + * Records a failed call. This method should be invoked when a call fails. + * + * @param durationInMs The elapsed time duration of the call in milliseconds. + * @param throwable The throwable that represents the failure. + */ + protected abstract void doOnError(long durationInMs, Throwable throwable); + + /** + * Records a successful call. This method should be invoked when a call is successful. + * + * @param durationInMs The elapsed time duration of the call in milliseconds. + */ + protected abstract void doOnSuccess(long durationInMs); + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreakerFactory.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreakerFactory.java index 827baef69..174bf9781 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreakerFactory.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/AbstractCircuitBreakerFactory.java @@ -15,17 +15,11 @@ */ package com.jd.live.agent.governance.invoke.circuitbreak; -import com.jd.live.agent.core.inject.annotation.Inject; import com.jd.live.agent.core.util.URI; -import com.jd.live.agent.core.util.time.Timer; -import com.jd.live.agent.governance.config.GovernanceConfig; +import com.jd.live.agent.governance.config.RecyclerConfig; +import com.jd.live.agent.governance.invoke.permission.AbstractLicenseeFactory; import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - /** * AbstractCircuitBreakerFactory provides a base implementation for factories that create and manage circuit breakers. * It uses a thread-safe map to store and retrieve circuit breakers associated with specific circuit breaker policies. @@ -34,83 +28,18 @@ * * @since 1.1.0 */ -public abstract class AbstractCircuitBreakerFactory implements CircuitBreakerFactory { - - /** - * A thread-safe map to store circuit breakers associated with their respective policies. - * Key is the string URI of the policy, and the values are atomic references to the circuit breakers. - */ - private final Map> circuitBreakers = new ConcurrentHashMap<>(); - - @Inject(Timer.COMPONENT_TIMER) - private Timer timer; - - @Inject(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG) - private GovernanceConfig governanceConfig; - - private final AtomicBoolean recycled = new AtomicBoolean(false); +public abstract class AbstractCircuitBreakerFactory + extends AbstractLicenseeFactory + implements CircuitBreakerFactory { @Override public CircuitBreaker get(CircuitBreakPolicy policy, URI uri) { - if (policy == null || uri == null) { - return null; - } - AtomicReference reference = circuitBreakers.computeIfAbsent(uri.toString(), n -> new AtomicReference<>()); - CircuitBreaker circuitBreaker = reference.get(); - if (circuitBreaker != null && circuitBreaker.getPolicy().getVersion() == policy.getVersion()) { - return circuitBreaker; - } - CircuitBreaker breaker = create(policy, uri); - while (true) { - circuitBreaker = reference.get(); - if (circuitBreaker == null || circuitBreaker.getPolicy().getVersion() < policy.getVersion()) { - if (reference.compareAndSet(circuitBreaker, breaker)) { - circuitBreaker = breaker; - if (recycled.compareAndSet(false, true)) { - addRecycler(); - } - break; - } - } else { - break; - } - } - return circuitBreaker; + return get(policy, uri == null ? null : uri.toString(), null, () -> create(policy, uri)); } - /** - * Schedules a recurring task to recycle circuit breakers based on their expiration time. - * This method retrieves the clean interval from the configuration and sets up a delayed task - * that calls the {@link #recycle()} method and reschedules itself. - */ - private void addRecycler() { - long cleanInterval = governanceConfig.getServiceConfig().getCircuitBreaker().getCleanInterval(); - timer.delay("recycle-circuitbreaker", cleanInterval, () -> { - recycle(); - addRecycler(); - }); - } - - /** - * Recycles expired circuit breakers. This method checks each circuit breaker to see if it has - * expired based on the current time and the configured expiration time. If a circuit breaker - * is not open and has exceeded its expiration time, it is removed from the collection. - */ - private void recycle() { - long expireTime = governanceConfig.getServiceConfig().getCircuitBreaker().getExpireTime(); - for (Map.Entry> entry : circuitBreakers.entrySet()) { - AtomicReference reference = entry.getValue(); - CircuitBreaker circuitBreaker = reference.get(); - if (circuitBreaker != null && !circuitBreaker.isOpen() && (System.currentTimeMillis() - circuitBreaker.getLastAcquireTime()) > expireTime) { - reference = circuitBreakers.remove(entry.getKey()); - if (reference != null) { - circuitBreaker = reference.get(); - if (circuitBreaker != null && (System.currentTimeMillis() - circuitBreaker.getLastAcquireTime()) <= expireTime) { - circuitBreakers.putIfAbsent(entry.getKey(), reference); - } - } - } - } + @Override + protected RecyclerConfig getConfig() { + return governanceConfig.getServiceConfig().getCircuitBreaker(); } /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreaker.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreaker.java index 5e772655b..bc44ffe69 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreaker.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreaker.java @@ -16,6 +16,7 @@ package com.jd.live.agent.governance.invoke.circuitbreak; import com.jd.live.agent.core.util.URI; +import com.jd.live.agent.governance.invoke.permission.Licensee; import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; /** @@ -23,30 +24,7 @@ * * @since 1.1.0 */ -public interface CircuitBreaker { - - /** - * Attempts to acquire a permit and returns the result. - * - * @return true if the permit is acquired successfully, false otherwise. - */ - default boolean acquire() { - return true; - } - - /** - * Retrieves the timestamp of the last successful acquisition. - * - * @return the timestamp of the last acquisition in milliseconds. - */ - long getLastAcquireTime(); - - /** - * Checks if the circuit breaker is currently open. - * - * @return true if the circuit breaker is open, false otherwise. - */ - boolean isOpen(); +public interface CircuitBreaker extends Licensee { /** * Releases the acquired permit. @@ -77,12 +55,6 @@ default void release() { */ void addListener(CircuitBreakerStateListener listener); - /** - * Retrieves the policy that governs the behavior of the circuit breaker. - * - * @return the circuit breaker policy. - */ - CircuitBreakPolicy getPolicy(); /** * Obtains the URI related to the circuit breaker. diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerState.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerState.java index 879bce963..f21519e09 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerState.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerState.java @@ -20,9 +20,29 @@ * * @since 1.1.0 */ + +/** + * Represents the different states of a circuit breaker. + */ public enum CircuitBreakerState { + + /** + * The circuit breaker is in the open state, blocking requests and returning a failure response. + */ OPEN, + + /** + * The circuit breaker is in the half-open state, allowing a limited number of test requests to determine if the system has recovered. + */ HALF_OPEN, + + /** + * The circuit breaker is in the closed state, allowing requests to pass through. + */ CLOSED, - DISABLED; -} \ No newline at end of file + + /** + * The circuit breaker is in the disabled state, where it does not perform any circuit breaker functionality and allows all requests to pass through. + */ + DISABLED +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerStateEvent.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerStateEvent.java index e6b8613b8..ece9ac719 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerStateEvent.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/circuitbreak/CircuitBreakerStateEvent.java @@ -19,7 +19,7 @@ import lombok.Setter; /** - * CircuitBreakerStateEvent + * Represents an event that captures the state transition of a circuit breaker. * * @since 1.1.0 */ @@ -27,10 +27,19 @@ @Getter public class CircuitBreakerStateEvent { + /** + * The URI associated with the circuit breaker. + */ private String uri; + /** + * The previous state of the circuit breaker before the transition. + */ private CircuitBreakerState from; + /** + * The new state of the circuit breaker after the transition. + */ private CircuitBreakerState to; } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java index 954ca02ea..a3b5d4fe2 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java @@ -86,7 +86,7 @@ E extends Endpoint> CompletionStage invoke(LiveCluster cluster, Outb CompletionStage stage = context.outbound(invocation, endpoint, () -> cluster.invoke(request, instance)); stage.whenComplete((o, r) -> { if (r != null) { - logger.error("Exception occurred when invoke, caused by " + r.getMessage(), r); + error("Exception occurred when invoke, caused by " + r.getMessage(), r); onException(cluster, invocation, o, new ServiceError(r, false), instance, result); } else { ServiceError error = o.getError(); @@ -98,11 +98,11 @@ E extends Endpoint> CompletionStage invoke(LiveCluster cluster, Outb } }); } catch (Throwable e) { - logger.error("Exception occurred when routing, caused by " + e.getMessage(), e); + error("Exception occurred when routing, caused by " + e.getMessage(), e); onException(cluster, invocation, null, new ServiceError(e, false), endpoint, result); } } else { - logger.error("Exception occurred when service discovery, caused by " + t.getMessage(), t); + error("Exception occurred when service discovery, caused by " + t.getMessage(), t); onException(cluster, invocation, null, new ServiceError(t, false), null, result); } }); @@ -210,4 +210,19 @@ E extends Endpoint> void onException(LiveCluster cluster, } } } + + /** + * Logs an error message. If the provided exception is null or an instance of LiveException, + * it logs only the message. Otherwise, it logs both the message and the exception. + * + * @param msg the error message to log + * @param e the throwable to log, which can be null + */ + protected void error(String msg, Throwable e) { + if (e == null || e instanceof LiveException) { + logger.error(msg); + } else { + logger.error(msg, e); + } + } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiter.java index 027f48dc3..9012af63d 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiter.java @@ -3,17 +3,22 @@ import com.jd.live.agent.governance.policy.service.limit.ConcurrencyLimitPolicy; import lombok.Getter; +import java.util.concurrent.atomic.AtomicBoolean; + /** * AbstractConcurrencyLimiter * * @since 1.0.0 */ -@Getter public abstract class AbstractConcurrencyLimiter implements ConcurrencyLimiter { - private final ConcurrencyLimitPolicy policy; + @Getter + protected final ConcurrencyLimitPolicy policy; + + @Getter + protected long lastAccessTime; - private long lastAcquireTime; + protected final AtomicBoolean started = new AtomicBoolean(true); public AbstractConcurrencyLimiter(ConcurrencyLimitPolicy policy) { this.policy = policy; @@ -21,10 +26,20 @@ public AbstractConcurrencyLimiter(ConcurrencyLimitPolicy policy) { @Override public boolean acquire() { - lastAcquireTime = System.currentTimeMillis(); + if (!started.get()) { + return true; + } + lastAccessTime = System.currentTimeMillis(); return doAcquire(); } + @Override + public void close() { + if (started.compareAndSet(true, false)) { + doClose(); + } + } + /** * Performs the actual acquisition logic. * Subclasses must implement this method to define the specific acquisition behavior. @@ -32,4 +47,11 @@ public boolean acquire() { * @return true if the acquisition is successful, false otherwise. */ protected abstract boolean doAcquire(); + + /** + * Closes the limiter. + */ + protected void doClose() { + + } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiterFactory.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiterFactory.java index 46018f241..e34a79573 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiterFactory.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/AbstractConcurrencyLimiterFactory.java @@ -15,91 +15,33 @@ */ package com.jd.live.agent.governance.invoke.concurrencylimit; -import com.jd.live.agent.core.inject.annotation.Inject; -import com.jd.live.agent.core.util.time.Timer; -import com.jd.live.agent.governance.config.GovernanceConfig; +import com.jd.live.agent.governance.config.RecyclerConfig; +import com.jd.live.agent.governance.invoke.permission.AbstractLicenseeFactory; import com.jd.live.agent.governance.policy.service.limit.ConcurrencyLimitPolicy; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - /** * AbstractConcurrencyLimiterFactory */ -public abstract class AbstractConcurrencyLimiterFactory implements ConcurrencyLimiterFactory { - - @Inject(Timer.COMPONENT_TIMER) - private Timer timer; - - @Inject(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG) - private GovernanceConfig governanceConfig; - - private final AtomicBoolean recycled = new AtomicBoolean(false); - - private final Map> limiters = new ConcurrentHashMap<>(); +public abstract class AbstractConcurrencyLimiterFactory + extends AbstractLicenseeFactory + implements ConcurrencyLimiterFactory { @Override public ConcurrencyLimiter get(ConcurrencyLimitPolicy policy) { - if (policy == null) { - return null; - } - AtomicReference reference = limiters.computeIfAbsent(policy.getId(), n -> new AtomicReference<>()); - ConcurrencyLimiter concurrencyLimiter = reference.get(); - if (concurrencyLimiter != null && concurrencyLimiter.getPolicy().getVersion() == policy.getVersion()) { - return concurrencyLimiter; - } - ConcurrencyLimiter newLimiter = create(policy); - while (true) { - concurrencyLimiter = reference.get(); - if (concurrencyLimiter == null || concurrencyLimiter.getPolicy().getVersion() < policy.getVersion()) { - if (reference.compareAndSet(concurrencyLimiter, newLimiter)) { - concurrencyLimiter = newLimiter; - if (recycled.compareAndSet(false, true)) { - addRecycler(); - } - break; - } - } else { - break; - } - } - return concurrencyLimiter; + return get(policy, policy == null ? null : policy.getId(), null, () -> create(policy)); } - /** - * Schedules a recurring task to recycle concurrency limiters based on their expiration time. - * This method retrieves the clean interval from the configuration and sets up a delayed task - * that calls the {@link #recycle()} method and reschedules itself. - */ - private void addRecycler() { - long cleanInterval = governanceConfig.getServiceConfig().getConcurrencyLimiter().getCleanInterval(); - timer.delay("recycle-concurrency-limiter", cleanInterval, () -> { - recycle(); - addRecycler(); - }); + @Override + protected RecyclerConfig getConfig() { + return governanceConfig.getServiceConfig().getConcurrencyLimiter(); } /** - * Recycles expired concurrency limiters. This method checks each concurrency limiter to see if it has - * expired based on the current time and the configured expiration time. If a concurrency limiter - * has exceeded its expiration time, it is removed from the collection. + * Retrieves a new instance of a {@link ConcurrencyLimiter} based on the provided + * concurrency limit policy. + * + * @param policy the policy that defines the concurrency limiting rules. + * @return a new instance of a concurrency limiter configured according to the policy. */ - private void recycle() { - long expireTime = governanceConfig.getServiceConfig().getConcurrencyLimiter().getExpireTime(); - for (Map.Entry> entry : limiters.entrySet()) { - AtomicReference reference = entry.getValue(); - ConcurrencyLimiter limiter = reference.get(); - if (limiter != null && (System.currentTimeMillis() - limiter.getLastAcquireTime()) > expireTime) { - reference = limiters.remove(entry.getKey()); - if (reference != null) { - limiter = reference.get(); - if (limiter != null && (System.currentTimeMillis() - limiter.getLastAcquireTime()) <= expireTime) { - limiters.putIfAbsent(entry.getKey(), reference); - } - } - } - } - } + protected abstract ConcurrencyLimiter create(ConcurrencyLimitPolicy policy); } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiter.java index 3bf3d406c..45ba202c6 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiter.java @@ -15,6 +15,7 @@ */ package com.jd.live.agent.governance.invoke.concurrencylimit; +import com.jd.live.agent.governance.invoke.permission.Licensee; import com.jd.live.agent.governance.policy.service.limit.ConcurrencyLimitPolicy; /** @@ -22,26 +23,7 @@ * * @since 1.0.0 */ -public interface ConcurrencyLimiter { +public interface ConcurrencyLimiter extends Licensee { - /** - * Try to get a permit return the result - * - * @return result - */ - boolean acquire(); - /** - * Retrieves the timestamp of the last successful acquisition. - * - * @return the timestamp of the last acquisition in milliseconds. - */ - long getLastAcquireTime(); - - /** - * Get concurrency-limit policy - * - * @return policy - */ - ConcurrencyLimitPolicy getPolicy(); } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiterFactory.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiterFactory.java index 250324e4e..2b7c9e1fa 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiterFactory.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/concurrencylimit/ConcurrencyLimiterFactory.java @@ -29,20 +29,11 @@ public interface ConcurrencyLimiterFactory { /** * Retrieves a concurrency limiter for the given concurrency limit policy. If a concurrency limiter for the policy * already exists and its version is greater than or equal to the policy version, it is returned. - * Otherwise, a new concurrency limiter is created using the {@link #create(ConcurrencyLimitPolicy)} method. + * Otherwise, a new concurrency limiter is created. * * @param policy The concurrency limit policy for which to retrieve or create a concurrency limiter. * @return A concurrency limiter that corresponds to the given policy, or null if the policy is null. */ ConcurrencyLimiter get(ConcurrencyLimitPolicy policy); - /** - * Retrieves a new instance of a {@link ConcurrencyLimiter} based on the provided - * concurrency limit policy. - * - * @param policy the policy that defines the concurrency limiting rules. - * @return a new instance of a concurrency limiter configured according to the policy. - */ - ConcurrencyLimiter create(ConcurrencyLimitPolicy policy); - } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/CircuitBreakerFilter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/CircuitBreakerFilter.java index b322fd850..a7c3086cb 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/CircuitBreakerFilter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/CircuitBreakerFilter.java @@ -21,6 +21,8 @@ import com.jd.live.agent.core.inject.annotation.Inject; import com.jd.live.agent.core.inject.annotation.Injectable; import com.jd.live.agent.core.util.URI; +import com.jd.live.agent.core.util.time.TimeWindow; +import com.jd.live.agent.core.util.time.TimeWindowList; import com.jd.live.agent.governance.annotation.ConditionalOnFlowControlEnabled; import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.exception.CircuitBreakException; @@ -37,6 +39,7 @@ import com.jd.live.agent.governance.policy.PolicyId; import com.jd.live.agent.governance.policy.live.FaultType; import com.jd.live.agent.governance.policy.service.ServicePolicy; +import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakEndpoint; import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; import com.jd.live.agent.governance.policy.service.circuitbreak.DegradeConfig; import com.jd.live.agent.governance.policy.service.exception.ErrorParser; @@ -87,46 +90,115 @@ public void filter(OutboundInvocation invocation, List policies = servicePolicy == null ? null : servicePolicy.getCircuitBreakPolicies(); if (null != policies && !policies.isEmpty()) { List instancePolicies = null; - List serviceBreakers = new ArrayList<>(policies.size()); - CircuitBreaker breaker; + List breakers = new ArrayList<>(policies.size()); T request = invocation.getRequest(); for (CircuitBreakPolicy policy : policies) { request.addErrorPolicy(policy); switch (policy.getLevel()) { case SERVICE: - breaker = getCircuitBreaker(policy, policy.getUri()); - if (null != breaker) { - serviceBreakers.add(breaker); - } + addCircuitBreaker(breakers, policy, policy.getUri()); break; case API: URI api = policy.getUri().path(metadata.getPath()).parameters(PolicyId.KEY_SERVICE_METHOD, metadata.getMethod()); - breaker = getCircuitBreaker(policy, api); - if (null != breaker) { - serviceBreakers.add(breaker); - } + addCircuitBreaker(breakers, policy, api); break; default: - if (instancePolicies == null) { - instancePolicies = new ArrayList<>(policies.size()); - } - instancePolicies.add(policy); + instancePolicies = addPolicy(policy, instancePolicies); } } // add listener before acquire permit - invocation.addListener(new CircuitBreakerListener(this::getCircuitBreaker, errorParsers, serviceBreakers, instancePolicies)); + invocation.addListener(new CircuitBreakerListener(this::getCircuitBreaker, errorParsers, breakers, instancePolicies)); // acquire service permit - acquire(invocation, serviceBreakers); + acquire(invocation, breakers); // filter broken instance - if (instancePolicies != null && !instancePolicies.isEmpty()) { - RouteTarget target = invocation.getRouteTarget(); - long currentTime = System.currentTimeMillis(); - for (CircuitBreakPolicy policy : instancePolicies) { - target.filter(endpoint -> !policy.isBroken(endpoint.getId(), currentTime)); + filterHealthy(invocation, instancePolicies); + } + chain.filter(invocation); + } + + /** + * Adds a circuit breaker policy to the list of policies. + * + * @param policy the circuit breaker policy to add + * @param policies the list of circuit breaker policies + * @return the updated list of circuit breaker policies + */ + private List addPolicy(CircuitBreakPolicy policy, List policies) { + if (policies == null) { + policies = new ArrayList<>(2); + } + policies.add(policy); + return policies; + } + + /** + * Filters healthy endpoints from the route target based on the provided circuit breaker policies. + * + * @param invocation the outbound invocation containing the route target + * @param policies the list of circuit breaker policies to apply + */ + private void filterHealthy(OutboundInvocation invocation, + List policies) { + if (policies != null && !policies.isEmpty()) { + RouteTarget target = invocation.getRouteTarget(); + long now = System.currentTimeMillis(); + target.filter(endpoint -> isHealthy(endpoint, policies, now)); + } + } + + /** + * Checks if the given endpoint is healthy based on the provided circuit break policies and the current time. + * + * @param endpoint The endpoint to check. + * @param policies The list of circuit break policies to apply. + * @param now The current time in milliseconds. + * @return True if the endpoint is healthy, false otherwise. + */ + private boolean isHealthy(Endpoint endpoint, List policies, long now) { + TimeWindowList windowList = null; + CircuitBreakEndpoint cbe; + for (CircuitBreakPolicy policy : policies) { + cbe = policy.getEndpoint(endpoint.getId()); + if (cbe == null) { + continue; + } else if (cbe.isOpen()) { + return false; + } else if (cbe.isHalfOpen()) { + continue; + } else if (cbe.isRecover(policy.getRecoveryDuration())) { + // in recover + if (windowList == null) { + windowList = new TimeWindowList(); } + windowList.add(new TimeWindow(cbe.getLastUpdateTime(), policy.getRecoveryDuration())); + } else { + // Healthy nodes, if not being concurrently updated, will be deleted. + policy.removeEndpoint(cbe); } } - chain.filter(invocation); + if (windowList != null) { + TimeWindow window = windowList.max(); + if (window != null) { + endpoint.setRecoverTime(window.getStartTime()); + endpoint.setRecoverDuration((int) window.getDuration()); + } + } + + return true; + } + + /** + * Adds a circuit breaker to the list of breakers based on the given policy and URI. + * + * @param breakers the list of circuit breakers + * @param policy the circuit breaker policy + * @param uri the URI for which the circuit breaker is created + */ + private void addCircuitBreaker(List breakers, CircuitBreakPolicy policy, URI uri) { + CircuitBreaker breaker = getCircuitBreaker(policy, uri); + if (null != breaker) { + breakers.add(breaker); + } } /** @@ -208,27 +280,27 @@ private static class CircuitBreakerListener implements OutboundListener { private List circuitBreakers; - private final List instancePolicies; + private final List policies; private final int index; - CircuitBreakerListener(CircuitBreakerFactory factory, Map errorParsers, List circuitBreakers, - List instancePolicies) { + List policies) { this.factory = factory; this.errorParsers = errorParsers; this.circuitBreakers = circuitBreakers; - this.instancePolicies = instancePolicies; + this.policies = policies; this.index = circuitBreakers.size(); } @Override public boolean onElect(Endpoint endpoint, OutboundInvocation invocation) { - if (endpoint != null && instancePolicies != null && !instancePolicies.isEmpty()) { - for (CircuitBreakPolicy policy : instancePolicies) { + if (endpoint != null && policies != null && !policies.isEmpty()) { + for (CircuitBreakPolicy policy : policies) { URI uri = policy.getUri().parameter(PolicyId.KEY_SERVICE_ENDPOINT, endpoint.getId()); + // The circuit breaker, if in a healthy state and not accessed for 1 minute, will be recycled. CircuitBreaker breaker = factory.get(policy, uri); if (breaker != null) { circuitBreakers.add(breaker); diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java index 4014dff08..176d4973d 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java @@ -45,7 +45,7 @@ public class RandomWeightLoadBalancer extends AbstractLoadBalancer { protected Candidate doElect(List endpoints, Invocation invocation) { ServiceRequest request = invocation.getRequest(); // Use the RandomWeight utility to select an endpoint based on the weights. - return RandomWeight.elect(endpoints, e -> e.getWeight(request)); + return RandomWeight.elect(endpoints, e -> e.reweight(request)); } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/response/ShortestResponseLoadBalancer.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/response/ShortestResponseLoadBalancer.java index 7ab73ef3a..d7ec9c386 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/response/ShortestResponseLoadBalancer.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/response/ShortestResponseLoadBalancer.java @@ -79,7 +79,7 @@ protected Candidate doElect(List endpoints, Invocatio // Calculate the estimated response time from the product of active connections and succeeded average // elapsed time. long estimateResponse = counter.getSnapshot().getEstimateResponse(); - int weight = endpoint.getWeight(request); + int weight = endpoint.reweight(request); weights[i] = weight; // Same as LeastActiveLoadBalance if (estimateResponse < shortestResponse) { diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/AbstractLicenseeFactory.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/AbstractLicenseeFactory.java new file mode 100644 index 000000000..2bca832a0 --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/AbstractLicenseeFactory.java @@ -0,0 +1,150 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.invoke.permission; + +import com.jd.live.agent.core.inject.annotation.Inject; +import com.jd.live.agent.core.util.AtomicUtils; +import com.jd.live.agent.core.util.Close; +import com.jd.live.agent.core.util.time.Timer; +import com.jd.live.agent.governance.config.GovernanceConfig; +import com.jd.live.agent.governance.config.RecyclerConfig; +import com.jd.live.agent.governance.policy.PolicyVersion; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * An abstract class that serves as a factory for creating and managing instances of + * {@link Licensee}. It maintains a map of licensees and provides methods to retrieve, + * create, and recycle them based on policy versions and expiration times. + * + * @param

the type of policy version + * @param the type of key used to identify licensees + * @param the type of licensee + */ +public abstract class AbstractLicenseeFactory

> { + + /** + * A map that stores licensees with their keys. Each value is an {@link AtomicReference} + * to a {@link Licensee}. + */ + protected final Map> licensees = new ConcurrentHashMap<>(); + + /** + * A timer used to schedule recurring tasks for recycling licensees. + */ + @Inject(Timer.COMPONENT_TIMER) + protected Timer timer; + + /** + * Configuration related to governance settings. + */ + @Inject(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG) + protected GovernanceConfig governanceConfig; + + /** + * A flag to indicate whether the recycler has been added. + */ + protected final AtomicBoolean recycled = new AtomicBoolean(false); + + /** + * Returns the configuration for the recycler. + * + * @return the recycler configuration + */ + protected abstract RecyclerConfig getConfig(); + + /** + * Retrieves a licensee for a given policy and key. If the policy is null or does not + * satisfy the predicate, it returns null. If a valid licensee exists with a version + * greater than or equal to the policy version, it returns that licensee. Otherwise, + * it creates a new licensee using the provided creator. + * + * @param policy the policy version + * @param key the key to identify the licensee + * @param predicate a predicate to test the policy + * @param creator a supplier to create a new licensee + * @return the licensee or null if not applicable + */ + protected V get(P policy, K key, Predicate

predicate, Supplier creator) { + if (policy == null || key == null || predicate != null && !predicate.test(policy)) { + return null; + } + return AtomicUtils.getOrUpdate(licensees, key, creator, policy::isOlderThan, this::onSuccess); + } + + /** + * Handles the success case when a new value is successfully set. + * + * @param oldValue the old value that is being replaced + * @param newValue the new value that has been set + */ + protected void onSuccess(V oldValue, V newValue) { + Close.instance().close(oldValue); + if (recycled.compareAndSet(false, true)) { + addRecycler(getTaskName()); + } + } + + /** + * Generates a name for the recycler task based on the recycler configuration class name. + * + * @return the name of the recycler task + */ + protected String getTaskName() { + String name = getConfig().getClass().getSimpleName(); + name = name.replace("Config", ""); + name = "Recycle-" + name; + return name; + } + + /** + * Schedules a recurring task to recycle licensees based on their expiration time. + * This method retrieves the clean interval from the configuration and sets up a delayed task + * that calls the {@link #recycle()} method and reschedules itself. + */ + protected void addRecycler(String name) { + timer.delay(name, getConfig().getCleanInterval(), () -> { + recycle(); + addRecycler(name); + }); + } + + /** + * Recycles expired licensee. This method checks each licensee to see if it has + * expired based on the current time and the configured expiration time. If a rate limiter + * has exceeded its expiration time, it is removed from the collection. + */ + protected void recycle() { + long expireTime = getConfig().getExpireTime(); + Close closer = Close.instance(); + licensees.entrySet().removeIf(entry -> { + V licensee = entry.getValue().get(); + if (licensee != null && licensee.isExpired(expireTime)) { + // close first to avoid concurrent modification exception + closer.close(licensee); + return true; + } + return false; + }); + } + +} + diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/Licensee.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/Licensee.java new file mode 100644 index 000000000..22f4e4298 --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/permission/Licensee.java @@ -0,0 +1,69 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.invoke.permission; + +import com.jd.live.agent.governance.policy.PolicyVersion; + +/** + * Represents an entity that holds a license. License holders typically need to close + * + * @since 1.6.0 + */ +public interface Licensee

extends PolicyVersion, AutoCloseable { + + /** + * Attempts to acquire a permit and returns the result. + * + * @return true if the permit is acquired successfully, false otherwise. + */ + default boolean acquire() { + return true; + } + + /** + * Retrieves the timestamp of the last successful acquisition. + * + * @return the timestamp of the last acquisition in milliseconds. + */ + long getLastAccessTime(); + + /** + * Checks if the current time has exceeded the specified timeout period since the last acquire time. + * + * @param timeout the timeout period in milliseconds + * @return true if the resource has expired, false otherwise + */ + default boolean isExpired(long timeout) { + return System.currentTimeMillis() - getLastAccessTime() > timeout; + } + + /** + * Retrieves the policy that governs the behavior of the circuit breaker. + * + * @return the circuit breaker policy. + */ + P getPolicy(); + + @Override + default long getVersion() { + return getPolicy().getVersion(); + } + + @Override + default void close() { + + } +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java index 9e7bca7d0..b11861c55 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java @@ -21,6 +21,7 @@ import lombok.Getter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * AbstractRateLimiter is an abstract implementation of the RateLimiter interface, @@ -56,7 +57,9 @@ public abstract class AbstractRateLimiter implements RateLimiter { protected final Option option; @Getter - protected long lastAcquireTime; + protected long lastAccessTime; + + protected final AtomicBoolean started = new AtomicBoolean(false); /** * Constructs an instance of the AbstractRateLimiter class with the given rate limit policy and time unit. @@ -72,27 +75,38 @@ public AbstractRateLimiter(RateLimitPolicy policy, TimeUnit timeUnit) { @Override public boolean acquire() { - this.lastAcquireTime = System.currentTimeMillis(); + if (!started.get()) { + return true; + } + this.lastAccessTime = System.currentTimeMillis(); return doAcquire(1, timeout, timeUnit); } @Override public boolean acquire(int permits) { - if (permits <= 0) { - return false; + if (!started.get()) { + return true; } - this.lastAcquireTime = System.currentTimeMillis(); - return doAcquire(permits, timeout, timeUnit); + this.lastAccessTime = System.currentTimeMillis(); + return permits <= 0 || doAcquire(permits, timeout, timeUnit); } @Override public boolean acquire(int permits, long timeout, TimeUnit timeUnit) { - if (permits <= 0) { - return false; + if (!started.get()) { + return true; + } + this.lastAccessTime = System.currentTimeMillis(); + return permits <= 0 || doAcquire(permits, timeout, timeUnit); + } + + @Override + public void close() { + if (started.compareAndSet(false, true)) { + doClose(); } - this.lastAcquireTime = System.currentTimeMillis(); - return doAcquire(permits, timeout, timeUnit); } + /** * Try to get some permits within a duration and return the result * @@ -103,5 +117,12 @@ public boolean acquire(int permits, long timeout, TimeUnit timeUnit) { */ protected abstract boolean doAcquire(int permits, long timeout, TimeUnit timeUnit); + /** + * Closes the limiter. + */ + protected void doClose() { + + } + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterFactory.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterFactory.java index ac71f3372..d46bed781 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterFactory.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterFactory.java @@ -15,18 +15,9 @@ */ package com.jd.live.agent.governance.invoke.ratelimit; -import com.jd.live.agent.core.inject.annotation.Inject; -import com.jd.live.agent.core.util.time.Timer; -import com.jd.live.agent.governance.config.GovernanceConfig; +import com.jd.live.agent.governance.config.RecyclerConfig; +import com.jd.live.agent.governance.invoke.permission.AbstractLicenseeFactory; import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; -import com.jd.live.agent.governance.policy.service.limit.SlidingWindow; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * AbstractLimiterFactory provides a base implementation for factories that create and manage rate limiters. @@ -36,84 +27,18 @@ * * @since 1.0.0 */ -public abstract class AbstractRateLimiterFactory implements RateLimiterFactory { - - private final Map> limiters = new ConcurrentHashMap<>(); - - @Inject(Timer.COMPONENT_TIMER) - private Timer timer; - - @Inject(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG) - private GovernanceConfig governanceConfig; - - private final AtomicBoolean recycled = new AtomicBoolean(false); +public abstract class AbstractRateLimiterFactory + extends AbstractLicenseeFactory + implements RateLimiterFactory { @Override public RateLimiter get(RateLimitPolicy policy) { - if (policy == null) { - return null; - } - List windows = policy.getSlidingWindows(); - if (windows == null || windows.isEmpty()) { - return null; - } - AtomicReference reference = limiters.computeIfAbsent(policy.getId(), n -> new AtomicReference<>()); - RateLimiter rateLimiter = reference.get(); - if (rateLimiter != null && rateLimiter.getPolicy().getVersion() == policy.getVersion()) { - return rateLimiter; - } - RateLimiter newLimiter = create(policy); - while (true) { - rateLimiter = reference.get(); - if (rateLimiter == null || rateLimiter.getPolicy().getVersion() < policy.getVersion()) { - if (reference.compareAndSet(rateLimiter, newLimiter)) { - rateLimiter = newLimiter; - if (recycled.compareAndSet(false, true)) { - addRecycler(); - } - break; - } - } else { - break; - } - } - return rateLimiter; - } - - /** - * Schedules a recurring task to recycle rate limiters based on their expiration time. - * This method retrieves the clean interval from the configuration and sets up a delayed task - * that calls the {@link #recycle()} method and reschedules itself. - */ - private void addRecycler() { - long cleanInterval = governanceConfig.getServiceConfig().getRateLimiter().getCleanInterval(); - timer.delay("recycle-rate-limiter", cleanInterval, () -> { - recycle(); - addRecycler(); - }); + return get(policy, policy == null ? null : policy.getId(), p -> p.getSlidingWindowSize() > 0, () -> create(policy)); } - /** - * Recycles expired rate limiters. This method checks each concurrency limiter to see if it has - * expired based on the current time and the configured expiration time. If a rate limiter - * has exceeded its expiration time, it is removed from the collection. - */ - private void recycle() { - long expireTime = governanceConfig.getServiceConfig().getRateLimiter().getExpireTime(); - List recycles = new ArrayList<>(); - limiters.forEach((key, reference) -> { - limiters.compute(key, (k, ref) -> { - if (ref != null) { - RateLimiter limiter = ref.get(); - if (limiter != null && limiter.isExpired(expireTime)) { - recycles.add(limiter); - return null; - } - } - return ref; - }); - }); - recycles.forEach(RateLimiter::recycle); + @Override + protected RecyclerConfig getConfig() { + return governanceConfig.getServiceConfig().getRateLimiter(); } /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/RateLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/RateLimiter.java index a87bb495b..f5110b9fa 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/RateLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/RateLimiter.java @@ -15,6 +15,7 @@ */ package com.jd.live.agent.governance.invoke.ratelimit; +import com.jd.live.agent.governance.invoke.permission.Licensee; import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; import java.util.concurrent.TimeUnit; @@ -24,24 +25,13 @@ * * @since 1.0.0 */ -public interface RateLimiter { +public interface RateLimiter extends Licensee { - /** - * Try to get a permit return the result - * - * @return result - */ + @Override default boolean acquire() { - return acquire(1); + return acquire(1, 0, TimeUnit.MILLISECONDS); } - /** - * Retrieves the timestamp of the last successful acquisition. - * - * @return the timestamp of the last acquisition in milliseconds. - */ - long getLastAcquireTime(); - /** * Try to get a permit within a duration and return the result * @@ -59,7 +49,9 @@ default boolean acquire(long timeout, TimeUnit timeUnit) { * @param permits Permits * @return result */ - boolean acquire(int permits); + default boolean acquire(int permits) { + return acquire(permits, 0, TimeUnit.MILLISECONDS); + } /** * Try to get some permits within a duration and return the result @@ -71,18 +63,4 @@ default boolean acquire(long timeout, TimeUnit timeUnit) { */ boolean acquire(int permits, long timeout, TimeUnit timeUnit); - /** - * Get rate-limit policy - * - * @return policy - */ - RateLimitPolicy getPolicy(); - - default boolean isExpired(long expireTime) { - return System.currentTimeMillis() - getLastAcquireTime() > expireTime; - } - - default void recycle() { - - } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyVersion.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyVersion.java new file mode 100644 index 000000000..d2aa88793 --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyVersion.java @@ -0,0 +1,44 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.policy; + +/** + * Represents a version of a policy. This interface provides a method to retrieve + * the version number of the policy. + * + * @since 1.6.0 + */ +public interface PolicyVersion { + + /** + * Returns the version number of the policy. + * + * @return the version number as a long + */ + long getVersion(); + + /** + * Checks if the current policy version is older than the specified policy version. + * + * @param other the policy version to compare against + * @return {@code true} if the current policy version is older than the specified policy version, + * or if the specified policy version is not null and the current version is less than or equal to it; + * {@code false} otherwise + */ + default boolean isOlderThan(PolicyVersion other) { + return other != null && getVersion() <= other.getVersion(); + } +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpoint.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpoint.java new file mode 100644 index 000000000..e95eaa88c --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpoint.java @@ -0,0 +1,112 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.policy.service.circuitbreak; + +import lombok.Getter; +import lombok.Setter; + +/** + * Represents an endpoint managed by a circuit breaker. + */ +@Getter +public class CircuitBreakEndpoint implements Cloneable { + + /** + * The unique identifier of the endpoint. + */ + private final String id; + + /** + * The current state of the circuit breaker for this endpoint. + */ + @Setter + private CircuitBreakEndpointState state; + + private long endTime; + + /** + * The timestamp of the last update to the circuit breaker state. + */ + @Setter + private long lastUpdateTime; + + /** + * Constructs a new CircuitBreakEndpoint with the specified parameters. + * + * @param id the unique identifier of the endpoint + * @param state the initial state of the circuit breaker + * @param endTime the end time in milliseconds for the circuit breaker state + * @param lastUpdateTime the last update time in milliseconds of the endpoint + */ + private CircuitBreakEndpoint(String id, CircuitBreakEndpointState state, long endTime, long lastUpdateTime) { + this.id = id; + this.state = state; + this.endTime = endTime; + this.lastUpdateTime = lastUpdateTime; + } + + /** + * Creates a new CircuitBreakEndpoint in the OPEN state with the specified parameters. + * + * @param id the unique identifier of the endpoint + * @param endTime the end time in milliseconds for the circuit breaker state + * @return a new CircuitBreakEndpoint in the OPEN state + */ + public static CircuitBreakEndpoint open(String id, long endTime) { + return new CircuitBreakEndpoint(id, CircuitBreakEndpointState.OPEN, endTime, System.currentTimeMillis()); + } + + /** + * Checks if the circuit breaker is in the open state and the current time is within the end time. + * + * @return {@code true} if the circuit breaker is in the open state and + * the current time is less than or equal to the end time, {@code false} otherwise. + */ + public boolean isOpen() { + // The transition from Open state to half_open state will not be automatically triggered. + // A request is needed to trigger it. + // Therefore, it is necessary to add an expiration time check. + return state == CircuitBreakEndpointState.OPEN && System.currentTimeMillis() <= endTime; + } + + /** + * Checks if the circuit breaker is in the half-open state. + * + * @return {@code true} if the circuit breaker is in the half-open state, {@code false} otherwise. + */ + public boolean isHalfOpen() { + return state == CircuitBreakEndpointState.HALF_OPEN || state == CircuitBreakEndpointState.OPEN && System.currentTimeMillis() > endTime; + } + + /** + * Checks if the circuit breaker is in the closed state and has not recovered within the specified duration. + * + * @param duration The duration in milliseconds to check for recovery. + * @return {@code true} if the circuit breaker is in the closed state and has not recovered within the specified duration, {@code false} otherwise. + */ + public boolean isRecover(long duration) { + return state == CircuitBreakEndpointState.CLOSED && lastUpdateTime + duration >= System.currentTimeMillis(); + } + + @Override + public CircuitBreakEndpoint clone() { + try { + return (CircuitBreakEndpoint) super.clone(); + } catch (Throwable e) { + return new CircuitBreakEndpoint(id, state, endTime, lastUpdateTime); + } + } +} diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpointState.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpointState.java new file mode 100644 index 000000000..877d837a0 --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakEndpointState.java @@ -0,0 +1,38 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.policy.service.circuitbreak; + +/** + * Represents the different states of a circuit breaker endpoint. + */ +public enum CircuitBreakEndpointState { + + /** + * The circuit breaker endpoint is in the closed state, allowing requests to pass through. + */ + CLOSED, + + /** + * The circuit breaker endpoint is in the open state, blocking requests and returning a failure response. + */ + OPEN, + + /** + * The circuit breaker endpoint is in the half-open state, allowing a limited number of test requests to determine if the system has recovered. + */ + HALF_OPEN +} + diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitLevel.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakLevel.java similarity index 71% rename from joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitLevel.java rename to joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakLevel.java index 6c4969a7c..0f1c352f7 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitLevel.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakLevel.java @@ -16,12 +16,24 @@ package com.jd.live.agent.governance.policy.service.circuitbreak; /** - * CircuitLevel + * Represents different levels at which circuit breaking can be applied. * * @since 1.1.0 */ -public enum CircuitLevel { +public enum CircuitBreakLevel { + + /** + * Circuit breaking at the service level. + */ SERVICE, + + /** + * Circuit breaking at the API level. + */ API, + + /** + * Circuit breaking at the instance level. + */ INSTANCE; } \ No newline at end of file diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakPolicy.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakPolicy.java index 72fa3a876..a51e562c1 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakPolicy.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/circuitbreak/CircuitBreakPolicy.java @@ -18,6 +18,7 @@ import com.jd.live.agent.governance.exception.ErrorPolicy; import com.jd.live.agent.governance.policy.PolicyId; import com.jd.live.agent.governance.policy.PolicyInherit; +import com.jd.live.agent.governance.policy.PolicyVersion; import com.jd.live.agent.governance.policy.service.exception.ErrorParserPolicy; import lombok.Getter; import lombok.Setter; @@ -34,7 +35,8 @@ */ @Setter @Getter -public class CircuitBreakPolicy extends PolicyId implements PolicyInherit.PolicyInheritWithIdGen, ErrorPolicy { +public class CircuitBreakPolicy extends PolicyId + implements PolicyInherit.PolicyInheritWithIdGen, ErrorPolicy, PolicyVersion { public static final String SLIDING_WINDOW_TIME = "time"; public static final String SLIDING_WINDOW_COUNT = "count"; @@ -45,6 +47,8 @@ public class CircuitBreakPolicy extends PolicyId implements PolicyInherit.Policy public static final int DEFAULT_ALLOWED_CALLS_IN_HALF_OPEN_STATE = 10; public static final int DEFAULT_SLIDING_WINDOW_SIZE = 100; public static final int DEFAULT_MIN_CALLS_THRESHOLD = 10; + public static final int DEFAULT_INSTANCE_RECOVER_DURATION = 1000 * 15; + public static final int DEFAULT_MAX_WAIT_DURATION_IN_HALF_OPEN_STATE = 0; /** * Name of this policy @@ -59,7 +63,7 @@ public class CircuitBreakPolicy extends PolicyId implements PolicyInherit.Policy /** * Level of circuit breaker policy */ - private CircuitLevel level = CircuitLevel.INSTANCE; + private CircuitBreakLevel level = CircuitBreakLevel.INSTANCE; /** * Sliding window type (statistical window type): count, time @@ -126,11 +130,18 @@ public class CircuitBreakPolicy extends PolicyId implements PolicyInherit.Policy */ private int allowedCallsInHalfOpenState = DEFAULT_ALLOWED_CALLS_IN_HALF_OPEN_STATE; + private int maxWaitDurationInHalfOpenState = DEFAULT_MAX_WAIT_DURATION_IN_HALF_OPEN_STATE; + /** * Whether to force the circuit breaker to be turned on */ private boolean forceOpen = false; + /** + * The gradual recovery period after the instance-level circuit breaker is opened. + */ + private int recoveryDuration = DEFAULT_INSTANCE_RECOVER_DURATION; + /** * Downgrade configuration */ @@ -142,9 +153,9 @@ public class CircuitBreakPolicy extends PolicyId implements PolicyInherit.Policy private long version; /** - * Map of temporarily blocked endpoints, key is endpoint id and value is the end time of block + * Map of temporarily blocked endpoints */ - private Map broken = new ConcurrentHashMap<>(); + private transient Map endpoints = new ConcurrentHashMap<>(); @Override public void supplement(CircuitBreakPolicy source) { @@ -183,7 +194,7 @@ public void supplement(CircuitBreakPolicy source) { uri = source.getUri(); } if (source.getVersion() == version) { - broken = source.broken; + endpoints = source.endpoints; } } @@ -208,44 +219,61 @@ public boolean containsException(Set classNames) { } /** - * Checks if the circuit for the given ID is currently broken. + * Retrieves the circuit break endpoint by its ID. * - * @param id the identifier of the circuit. - * @param now the current time in milliseconds. - * @return {@code true} if the circuit is broken, {@code false} otherwise. + * @param id the identifier of the circuit break endpoint + * @return the circuit break endpoint, or null if not found or ID is null */ - public boolean isBroken(String id, long now) { - Long endTime = id == null ? null : broken.get(id); - if (endTime == null) { - return false; + public CircuitBreakEndpoint getEndpoint(String id) { + return id == null ? null : endpoints.get(id); + } + + /** + * Adds an endpoint to the circuit breaker's list of broken endpoints. + * + * @param endpoint The endpoint to add. If the endpoint is null, it will not be added. + */ + public void addEndpoint(CircuitBreakEndpoint endpoint) { + if (endpoint != null) { + endpoints.put(endpoint.getId(), endpoint); } - if (endTime <= now) { - broken.remove(id); - return false; + } + + /** + * Updates the state of the specified endpoint. + * + * @param id The unique identifier of the endpoint to update. + * @param state The new state to set for the endpoint. + */ + public void updateEndpoint(String id, CircuitBreakEndpointState state) { + CircuitBreakEndpoint endpoint = getEndpoint(id); + if (endpoint != null) { + endpoint = endpoint.clone(); + endpoint.setState(state); + endpoint.setLastUpdateTime(System.currentTimeMillis()); + endpoints.put(id, endpoint); } - return true; } /** - * Adds an entry to the broken circuits with the specified ID and timestamp. + * Removes the endpoint with the specified ID from the collection if it exists. * - * @param id the identifier of the circuit. - * @param now the current time in milliseconds when the circuit was broken. + * @param id The ID of the endpoint to be removed. */ - public void addBroken(String id, long now) { + public void removeEndpoint(String id) { if (id != null) { - broken.put(id, now); + endpoints.remove(id); } } /** - * Removes the entry of the broken circuit with the specified ID. + * Removes the specified endpoint if it exists. * - * @param id the identifier of the circuit to remove. + * @param endpoint The endpoint to be removed. */ - public void removeBroken(String id) { - if (id != null) { - broken.remove(id); + public void removeEndpoint(CircuitBreakEndpoint endpoint) { + if (endpoint != null) { + endpoints.computeIfPresent(endpoint.getId(), (k, v) -> v == endpoint ? null : v); } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/ConcurrencyLimitPolicy.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/ConcurrencyLimitPolicy.java index 52110210e..e0c1d3f53 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/ConcurrencyLimitPolicy.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/ConcurrencyLimitPolicy.java @@ -16,6 +16,7 @@ package com.jd.live.agent.governance.policy.service.limit; import com.jd.live.agent.governance.policy.PolicyInherit.PolicyInheritWithIdGen; +import com.jd.live.agent.governance.policy.PolicyVersion; import com.jd.live.agent.governance.policy.service.annotation.Provider; import com.jd.live.agent.governance.rule.tag.TagCondition; import lombok.Getter; @@ -39,7 +40,8 @@ @Getter @Setter @Provider -public class ConcurrencyLimitPolicy extends AbstractLimitPolicy implements PolicyInheritWithIdGen { +public class ConcurrencyLimitPolicy extends AbstractLimitPolicy + implements PolicyInheritWithIdGen, PolicyVersion { /** * The maximum number of concurrent requests allowed. diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/RateLimitPolicy.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/RateLimitPolicy.java index f2b9464fd..1bc0de48f 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/RateLimitPolicy.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/RateLimitPolicy.java @@ -17,6 +17,7 @@ import com.jd.live.agent.core.parser.json.JsonAlias; import com.jd.live.agent.governance.policy.PolicyInherit.PolicyInheritWithIdGen; +import com.jd.live.agent.governance.policy.PolicyVersion; import com.jd.live.agent.governance.policy.service.annotation.Provider; import com.jd.live.agent.governance.rule.tag.TagCondition; import lombok.Getter; @@ -47,7 +48,8 @@ @Getter @Setter @Provider -public class RateLimitPolicy extends AbstractLimitPolicy implements PolicyInheritWithIdGen { +public class RateLimitPolicy extends AbstractLimitPolicy + implements PolicyInheritWithIdGen, PolicyVersion { /** * A list of sliding windows that define the rate limits. Each sliding window specifies @@ -104,6 +106,15 @@ public RateLimitPolicy(String name, String realizeType, List condi this.slidingWindows = slidingWindows; } + /** + * Returns the number of sliding windows. + * + * @return the number of sliding windows, or 0 if there are no sliding windows + */ + public int getSlidingWindowSize() { + return slidingWindows == null ? 0 : slidingWindows.size(); + } + /** * Supplements the current rate limit policy with another policy's details. If the current * policy does not have sliding windows defined, it inherits them from the specified source policy. diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/RedissonRateLimiter.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/RedissonRateLimiter.java index 9176362c4..2f69cfb42 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/RedissonRateLimiter.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/RedissonRateLimiter.java @@ -69,9 +69,9 @@ protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { } @Override - public void recycle() { + protected void doClose() { if (client != null) { - client.decReference(); + client.stop(); } } } diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClient.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClient.java index 22bd38f55..2dcd4391b 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClient.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClient.java @@ -37,7 +37,7 @@ public class RedisClient { private final Consumer consumer; - private final RedissonClient delegate; + private volatile RedissonClient delegate; private long lastAccessTime; @@ -46,7 +46,6 @@ public class RedisClient { public RedisClient(RedisConfig config, Consumer consumer) { this.config = config; this.consumer = consumer; - this.delegate = createClient(); } public long getId() { @@ -82,17 +81,24 @@ public boolean isExpired(long timeout) { } /** - * Increments the reference count of the Redis client. + * Start the redis client. */ - public void incReference() { + public void start() { + if (delegate == null) { + synchronized (this) { + if (delegate == null) { + delegate = createClient(); + } + } + } counter.incrementAndGet(); } /** - * Decrements the reference count of the Redis client. + * Stop the redis client. * If the reference count reaches zero, the client is removed by the consumer. */ - public void decReference() { + public void stop() { if (counter.decrementAndGet() == 0) { consumer.accept(this); } diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClientManager.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClientManager.java index b1a1f1a64..7a83fb5d9 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClientManager.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-redisson/src/main/java/com/jd/live/agent/implement/flowcontrol/ratelimit/redisson/client/RedisClientManager.java @@ -42,7 +42,7 @@ public RedisClientManager(Timer timer) { */ public RedisClient getOrCreateClient(RedisConfig config) { RedisClient client = clients.computeIfAbsent(config, c -> new RedisClient(c, this::removeClient)); - client.incReference(); + client.start(); client.setLastAccessTime(System.currentTimeMillis()); return client; } @@ -52,18 +52,14 @@ public RedisClient getOrCreateClient(RedisConfig config) { * * @param client the Redis client to be removed */ - private void removeClient(RedisClient client) { - RedisClient newClient = clients.remove(client.getConfig()); - if (newClient != null) { - if (newClient == client && newClient.getReference() == 0) { - addTask(newClient); - } else { - client = clients.putIfAbsent(newClient.getConfig(), newClient); - if (client != null) { - addTask(newClient); - } + private void removeClient(final RedisClient client) { + clients.computeIfPresent(client.getConfig(), (c, v) -> { + if (v == client && v.getReference() == 0) { + addTask(v); + return null; } - } + return v; + }); } /** @@ -73,7 +69,7 @@ private void removeClient(RedisClient client) { */ private void addTask(RedisClient client) { timer.add("recycle-redis-client-" + client.getId(), 5000, () -> { - if (client.getReference() == 0 && client.isExpired(10000)) { + if (client.getReference() == 0 || client.isExpired(10000)) { client.shutdown(); } else { addTask(client); diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/InstanceCircuitBreakerStateListener.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/InstanceCircuitBreakerStateListener.java deleted file mode 100644 index 3f47ba152..000000000 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/InstanceCircuitBreakerStateListener.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.implement.flowcontrol.circuitbreak.resilience4j; - -import com.jd.live.agent.bootstrap.logger.Logger; -import com.jd.live.agent.bootstrap.logger.LoggerFactory; -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerState; -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateEvent; -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateListener; -import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; - -/** - * InstanceCircuitBreakerStateListener - * - * @since 1.1.0 - */ -public class InstanceCircuitBreakerStateListener implements CircuitBreakerStateListener { - - private static final Logger logger = LoggerFactory.getLogger(InstanceCircuitBreakerStateListener.class); - - private final CircuitBreakPolicy policy; - - private final String instanceId; - - public InstanceCircuitBreakerStateListener(CircuitBreakPolicy policy, String instanceId) { - this.policy = policy; - this.instanceId = instanceId; - } - - @Override - public void onStateChange(CircuitBreakerStateEvent event) { - if (logger.isDebugEnabled()) { - logger.debug("[CircuitBreak]Instance state is transitioned from " + event.getFrom() + " to " + event.getTo() + ", uri=" + event.getUri()); - } - if (event.getTo() == CircuitBreakerState.OPEN) { - policy.addBroken(instanceId, System.currentTimeMillis() + policy.getWaitDurationInOpenState() * 1000L); - } else if (event.getFrom() == CircuitBreakerState.OPEN) { - policy.removeBroken(instanceId); - } - } -} diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreaker.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreaker.java index 0cd440a68..423a6a8b9 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreaker.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreaker.java @@ -15,13 +15,32 @@ */ package com.jd.live.agent.implement.flowcontrol.circuitbreak.resilience4j; +import com.jd.live.agent.bootstrap.logger.Logger; +import com.jd.live.agent.bootstrap.logger.LoggerFactory; +import com.jd.live.agent.core.util.Close; import com.jd.live.agent.core.util.URI; import com.jd.live.agent.governance.invoke.circuitbreak.AbstractCircuitBreaker; +import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerState; +import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateEvent; import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateListener; +import com.jd.live.agent.governance.policy.PolicyId; +import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakEndpoint; +import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakEndpointState; +import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakLevel; import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreaker.State; +import io.github.resilience4j.circuitbreaker.event.CircuitBreakerOnStateTransitionEvent; +import io.github.resilience4j.core.EventConsumer; +import io.github.resilience4j.core.lang.NonNull; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy.DEFAULT_WAIT_DURATION_IN_OPEN_STATE; /** * Resilience4jCircuitBreaker @@ -32,12 +51,15 @@ public class Resilience4jCircuitBreaker extends AbstractCircuitBreaker { private final io.github.resilience4j.circuitbreaker.CircuitBreaker delegate; - private final Resilience4jCircuitBreakerEventConsumer eventConsumer; + private final LiveEventConsumer eventConsumer; public Resilience4jCircuitBreaker(CircuitBreakPolicy policy, URI uri, CircuitBreaker delegate) { super(policy, uri); this.delegate = delegate; - this.eventConsumer = new Resilience4jCircuitBreakerEventConsumer(); + this.eventConsumer = policy.getLevel() != CircuitBreakLevel.INSTANCE + ? new LiveEventConsumer(this.started) + : new LiveEventConsumer(this.started, new LiveStateListener( + policy, uri.getParameter(PolicyId.KEY_SERVICE_ENDPOINT), this.started)); this.delegate.getEventPublisher().onStateTransition(eventConsumer); } @@ -47,30 +69,23 @@ protected boolean doAcquire() { } @Override - public void release() { + protected void doRelease() { delegate.releasePermission(); } @Override - public boolean isOpen() { - switch (delegate.getState()) { - case OPEN: - case FORCED_OPEN: - case HALF_OPEN: - return true; - default: - return false; - } + public boolean isExpired(long timeout) { + return delegate.getState() == State.CLOSED && super.isExpired(timeout); } @Override - public void onSuccess(long durationInMs) { - delegate.onSuccess(durationInMs, TimeUnit.MILLISECONDS); + protected void doOnError(long durationInMs, Throwable throwable) { + delegate.onError(durationInMs, TimeUnit.MILLISECONDS, throwable); } @Override - public void onError(long durationInMs, Throwable throwable) { - delegate.onError(durationInMs, TimeUnit.MILLISECONDS, throwable); + protected void doOnSuccess(long durationInMs) { + delegate.onSuccess(durationInMs, TimeUnit.MILLISECONDS); } @Override @@ -78,4 +93,135 @@ public void addListener(CircuitBreakerStateListener listener) { eventConsumer.addListener(listener); } + @Override + protected void doClose() { + eventConsumer.close(); + } + + /** + * A consumer that listens for state transition events from a Resilience4j circuit breaker + * and notifies registered listeners of state changes. + */ + private static class LiveEventConsumer implements EventConsumer, AutoCloseable { + + private final AtomicBoolean started; + + private final List listeners = new CopyOnWriteArrayList<>(); + + LiveEventConsumer(AtomicBoolean started, CircuitBreakerStateListener... listeners) { + this.started = started; + if (listeners != null) { + this.listeners.addAll(Arrays.asList(listeners)); + } + } + + @Override + public void consumeEvent(@NonNull CircuitBreakerOnStateTransitionEvent event) { + if (!started.get()) { + return; + } + CircuitBreaker.StateTransition transition = event.getStateTransition(); + CircuitBreakerStateEvent e = new CircuitBreakerStateEvent(); + e.setUri(event.getCircuitBreakerName()); + e.setFrom(convertState(transition.getFromState())); + e.setTo(convertState(transition.getToState())); + for (CircuitBreakerStateListener listener : listeners) { + listener.onStateChange(e); + } + } + + /** + * Adds a listener to be notified of circuit breaker state changes. + * + * @param listener the listener to add. If the listener is null, it will not be added. + */ + public void addListener(CircuitBreakerStateListener listener) { + if (listener != null) { + listeners.add(listener); + } + } + + /** + * Converts a Resilience4j circuit breaker state to a custom circuit breaker state. + * + * @param state the Resilience4j circuit breaker state to convert. + * @return the corresponding custom circuit breaker state. + */ + private CircuitBreakerState convertState(State state) { + switch (state) { + case OPEN: + case FORCED_OPEN: + return CircuitBreakerState.OPEN; + case HALF_OPEN: + return CircuitBreakerState.HALF_OPEN; + case CLOSED: + case METRICS_ONLY: + case DISABLED: + default: + return CircuitBreakerState.CLOSED; + } + } + + @Override + public void close() { + Close closer = Close.instance(); + for (CircuitBreakerStateListener listener : listeners) { + if (listener instanceof AutoCloseable) { + closer.close((AutoCloseable) listener); + } + } + } + } + + /** + * LiveStateListener + * + * @since 1.1.0 + */ + private static class LiveStateListener implements CircuitBreakerStateListener, AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(LiveStateListener.class); + + private final CircuitBreakPolicy policy; + + private final String instanceId; + + private final AtomicBoolean started; + + LiveStateListener(CircuitBreakPolicy policy, String instanceId, AtomicBoolean started) { + this.policy = policy; + this.instanceId = instanceId; + this.started = started; + } + + @Override + public void onStateChange(CircuitBreakerStateEvent event) { + if (!started.get()) { + // avoid another breaker conflict. + return; + } + if (logger.isDebugEnabled()) { + logger.debug("[CircuitBreak]Instance state is transitioned from " + event.getFrom() + " to " + event.getTo() + ", uri=" + event.getUri()); + } + switch (event.getTo()) { + case CLOSED: + policy.updateEndpoint(instanceId, CircuitBreakEndpointState.CLOSED); + break; + case HALF_OPEN: + policy.updateEndpoint(instanceId, CircuitBreakEndpointState.HALF_OPEN); + break; + case OPEN: + int waitDurationInOpenState = policy.getWaitDurationInOpenState() <= 0 ? DEFAULT_WAIT_DURATION_IN_OPEN_STATE : policy.getWaitDurationInOpenState(); + policy.addEndpoint(CircuitBreakEndpoint.open(instanceId, System.currentTimeMillis() + waitDurationInOpenState)); + break; + case DISABLED: + policy.removeEndpoint(instanceId); + } + } + + @Override + public void close() { + policy.removeEndpoint(instanceId); + } + } } diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerEventConsumer.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerEventConsumer.java deleted file mode 100644 index 4a036a20a..000000000 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerEventConsumer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.implement.flowcontrol.circuitbreak.resilience4j; - -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerState; -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateEvent; -import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreakerStateListener; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.event.CircuitBreakerOnStateTransitionEvent; -import io.github.resilience4j.core.EventConsumer; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * A consumer that listens for state transition events from a Resilience4j circuit breaker - * and notifies registered listeners of state changes. - */ -public class Resilience4jCircuitBreakerEventConsumer implements EventConsumer { - - private final List listeners = new CopyOnWriteArrayList<>(); - - @Override - public void consumeEvent(CircuitBreakerOnStateTransitionEvent event) { - CircuitBreaker.StateTransition transition = event.getStateTransition(); - CircuitBreakerStateEvent e = new CircuitBreakerStateEvent(); - e.setUri(event.getCircuitBreakerName()); - e.setFrom(convertState(transition.getFromState())); - e.setTo(convertState(transition.getToState())); - for (CircuitBreakerStateListener listener : listeners) { - listener.onStateChange(e); - } - } - - /** - * Adds a listener to be notified of circuit breaker state changes. - * - * @param listener the listener to add. If the listener is null, it will not be added. - */ - public void addListener(CircuitBreakerStateListener listener) { - if (listener != null) { - listeners.add(listener); - } - } - - /** - * Converts a Resilience4j circuit breaker state to a custom circuit breaker state. - * - * @param state the Resilience4j circuit breaker state to convert. - * @return the corresponding custom circuit breaker state. - */ - private CircuitBreakerState convertState(CircuitBreaker.State state) { - switch (state) { - case OPEN: - case FORCED_OPEN: - return CircuitBreakerState.OPEN; - case HALF_OPEN: - return CircuitBreakerState.HALF_OPEN; - case CLOSED: - case METRICS_ONLY: - case DISABLED: - default: - return CircuitBreakerState.CLOSED; - } - } - -} diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerFactory.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerFactory.java index ffb096b9b..f0656df2f 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerFactory.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/circuitbreak/resilience4j/Resilience4jCircuitBreakerFactory.java @@ -20,9 +20,7 @@ import com.jd.live.agent.core.util.URI; import com.jd.live.agent.governance.invoke.circuitbreak.AbstractCircuitBreakerFactory; import com.jd.live.agent.governance.invoke.circuitbreak.CircuitBreaker; -import com.jd.live.agent.governance.policy.PolicyId; import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitBreakPolicy; -import com.jd.live.agent.governance.policy.service.circuitbreak.CircuitLevel; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; @@ -40,12 +38,27 @@ @Extension(value = "Resilience4j") public class Resilience4jCircuitBreakerFactory extends AbstractCircuitBreakerFactory { - /** - * {@inheritDoc} - */ + private static final CircuitBreakerRegistry REGISTRY = CircuitBreakerRegistry.ofDefaults(); + @Override public CircuitBreaker create(CircuitBreakPolicy policy, URI uri) { - CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() + CircuitBreakerConfig config = getBuilder(policy).build(); + io.github.resilience4j.circuitbreaker.CircuitBreaker cb = REGISTRY.circuitBreaker(uri.toString(), config); + if (policy.isForceOpen()) { + cb.transitionToForcedOpenState(); + } + return new Resilience4jCircuitBreaker(policy, uri, cb); + } + + /** + * Creates and configures a {@link CircuitBreakerConfig.Builder} based on the provided {@link CircuitBreakPolicy}. + * + * @param policy The {@link CircuitBreakPolicy} containing the configuration parameters. + * @return A configured {@link CircuitBreakerConfig.Builder}. + */ + private CircuitBreakerConfig.Builder getBuilder(CircuitBreakPolicy policy) { + // TODO Uniform time unit. waitDurationInOpenState + CircuitBreakerConfig.Builder result = CircuitBreakerConfig.custom() .slidingWindowType(SLIDING_WINDOW_COUNT.equals(policy.getSlidingWindowType()) ? SlidingWindowType.COUNT_BASED : SlidingWindowType.TIME_BASED) .slidingWindowSize(policy.getSlidingWindowSize() <= 0 ? DEFAULT_SLIDING_WINDOW_SIZE : policy.getSlidingWindowSize()) .minimumNumberOfCalls(policy.getMinCallsThreshold() <= 0 ? DEFAULT_MIN_CALLS_THRESHOLD : policy.getMinCallsThreshold()) @@ -54,17 +67,11 @@ public CircuitBreaker create(CircuitBreakPolicy policy, URI uri) { .slowCallDurationThreshold(Duration.ofMillis(policy.getSlowCallDurationThreshold() <= 0 ? DEFAULT_SLOW_CALL_DURATION_THRESHOLD : policy.getSlowCallDurationThreshold())) .waitDurationInOpenState(Duration.ofSeconds(policy.getWaitDurationInOpenState() <= 0 ? DEFAULT_WAIT_DURATION_IN_OPEN_STATE : policy.getWaitDurationInOpenState())) .permittedNumberOfCallsInHalfOpenState(policy.getAllowedCallsInHalfOpenState() <= 0 ? DEFAULT_ALLOWED_CALLS_IN_HALF_OPEN_STATE : policy.getAllowedCallsInHalfOpenState()) - .recordException(e -> true) - .build(); - io.github.resilience4j.circuitbreaker.CircuitBreaker cb = CircuitBreakerRegistry.of(circuitBreakerConfig).circuitBreaker(uri.toString()); - if (policy.isForceOpen()) { - cb.transitionToForcedOpenState(); - } - CircuitBreaker circuitBreaker = new Resilience4jCircuitBreaker(policy, uri, cb); - if (policy.getLevel() == CircuitLevel.INSTANCE) { - circuitBreaker.addListener(new InstanceCircuitBreakerStateListener(policy, uri.getParameter(PolicyId.KEY_SERVICE_ENDPOINT))); + .recordException(e -> true); + if (policy.getMaxWaitDurationInHalfOpenState() > 0) { + result.maxWaitDurationInHalfOpenState(Duration.ofMillis(policy.getMaxWaitDurationInHalfOpenState())); } - return circuitBreaker; + return result; } } diff --git a/joylive-plugin/joylive-router/joylive-router-dubbo2.6/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_6/instance/DubboEndpoint.java b/joylive-plugin/joylive-router/joylive-router-dubbo2.6/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_6/instance/DubboEndpoint.java index 551d23a7c..4789f2539 100644 --- a/joylive-plugin/joylive-router/joylive-router-dubbo2.6/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_6/instance/DubboEndpoint.java +++ b/joylive-plugin/joylive-router/joylive-router-dubbo2.6/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_6/instance/DubboEndpoint.java @@ -71,7 +71,7 @@ public Long getTimestamp() { } @Override - public Integer getOriginWeight(ServiceRequest request) { + public Integer getWeight(ServiceRequest request) { String weight = url.getMethodParameter(request.getMethod(), Constants.LABEL_WEIGHT, null); if (weight == null || weight.isEmpty()) { weight = url.getParameter(Constants.LABEL_WEIGHT); diff --git a/joylive-plugin/joylive-router/joylive-router-dubbo2.7/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_7/instance/DubboEndpoint.java b/joylive-plugin/joylive-router/joylive-router-dubbo2.7/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_7/instance/DubboEndpoint.java index 77adf6033..901e97f03 100644 --- a/joylive-plugin/joylive-router/joylive-router-dubbo2.7/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_7/instance/DubboEndpoint.java +++ b/joylive-plugin/joylive-router/joylive-router-dubbo2.7/src/main/java/com/jd/live/agent/plugin/router/dubbo/v2_7/instance/DubboEndpoint.java @@ -61,7 +61,7 @@ public int getPort() { } @Override - public Integer getOriginWeight(ServiceRequest request) { + public Integer getWeight(ServiceRequest request) { URL target = invoker instanceof ClusterInvoker ? ((ClusterInvoker) invoker).getRegistryUrl() : url; String weight = target.getMethodParameter(request.getMethod(), Constants.LABEL_WEIGHT, null); if (weight == null || weight.isEmpty()) { diff --git a/joylive-plugin/joylive-router/joylive-router-dubbo3/src/main/java/com/jd/live/agent/plugin/router/dubbo/v3/instance/DubboEndpoint.java b/joylive-plugin/joylive-router/joylive-router-dubbo3/src/main/java/com/jd/live/agent/plugin/router/dubbo/v3/instance/DubboEndpoint.java index eaab8ddb6..3b4715fac 100644 --- a/joylive-plugin/joylive-router/joylive-router-dubbo3/src/main/java/com/jd/live/agent/plugin/router/dubbo/v3/instance/DubboEndpoint.java +++ b/joylive-plugin/joylive-router/joylive-router-dubbo3/src/main/java/com/jd/live/agent/plugin/router/dubbo/v3/instance/DubboEndpoint.java @@ -61,7 +61,7 @@ public int getPort() { } @Override - public Integer getOriginWeight(ServiceRequest request) { + public Integer getWeight(ServiceRequest request) { URL target = invoker instanceof ClusterInvoker ? ((ClusterInvoker) invoker).getRegistryUrl() : url; String weight = target.getMethodParameter(request.getMethod(), Constants.LABEL_WEIGHT, null); if (weight == null || weight.isEmpty()) { diff --git a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/instance/SofaRpcEndpoint.java b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/instance/SofaRpcEndpoint.java index b9a6319c6..e1fd6194c 100644 --- a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/instance/SofaRpcEndpoint.java +++ b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/instance/SofaRpcEndpoint.java @@ -64,7 +64,7 @@ public int getPort() { } @Override - public Integer getOriginWeight(ServiceRequest request) { + public Integer getWeight(ServiceRequest request) { return provider.getWeight(); } diff --git a/joylive-plugin/joylive-router/joylive-router-springweb5/src/main/java/com/jd/live/agent/plugin/router/springweb/v5/interceptor/DispatcherHandlerInterceptor.java b/joylive-plugin/joylive-router/joylive-router-springweb5/src/main/java/com/jd/live/agent/plugin/router/springweb/v5/interceptor/DispatcherHandlerInterceptor.java index dae8d9465..2ef71a684 100644 --- a/joylive-plugin/joylive-router/joylive-router-springweb5/src/main/java/com/jd/live/agent/plugin/router/springweb/v5/interceptor/DispatcherHandlerInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-springweb5/src/main/java/com/jd/live/agent/plugin/router/springweb/v5/interceptor/DispatcherHandlerInterceptor.java @@ -63,13 +63,13 @@ public void onEnter(ExecutableContext ctx) { Mono mono = context.inbound(invocation, () -> ((Mono) mc.invokeOrigin()).toFuture(), request::convert); if (config.isResponseException()) { mono = mono.doOnError(ex -> { - HttpHeaders headers = exchange.getResponse().getHeaders(); + HttpHeaders headers = HttpHeaders.writableHttpHeaders(exchange.getResponse().getHeaders()); labelHeaders(ex, headers::set); }).doOnSuccess(result -> { if (result != null) { Function> exceptionHandler = getValue(result, FIELD_EXCEPTION_HANDLER); result.setExceptionHandler(ex -> { - HttpHeaders headers = exchange.getResponse().getHeaders(); + HttpHeaders headers = HttpHeaders.writableHttpHeaders(exchange.getResponse().getHeaders()); labelHeaders(ex, headers::set); return exceptionHandler != null ? exceptionHandler.apply(ex) : Mono.error(ex); }); diff --git a/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/DispatcherHandlerInterceptor.java b/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/DispatcherHandlerInterceptor.java index 4afbdd674..d33049b2d 100644 --- a/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/DispatcherHandlerInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/DispatcherHandlerInterceptor.java @@ -62,7 +62,7 @@ public void onEnter(ExecutableContext ctx) { mono = mono.doOnError(ex -> { Boolean handled = (Boolean) exchange.getAttributes().remove(KEY_LIVE_EXCEPTION_HANDLED); if (handled == null || !handled) { - HttpHeaders headers = exchange.getResponse().getHeaders(); + HttpHeaders headers = HttpHeaders.writableHttpHeaders(exchange.getResponse().getHeaders()); labelHeaders(ex, headers::set); } }); diff --git a/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/HandleResultInterceptor.java b/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/HandleResultInterceptor.java index cf9ce155e..0c0b2248a 100644 --- a/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/HandleResultInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-springweb6/src/main/java/com/jd/live/agent/plugin/router/springweb/v6/interceptor/HandleResultInterceptor.java @@ -48,7 +48,7 @@ public void onSuccess(ExecutableContext ctx) { Mono mono = mc.getResult(); mono = mono.onErrorResume(ex -> { exchange.getAttributes().put(KEY_LIVE_EXCEPTION_HANDLED, Boolean.TRUE); - HttpHeaders headers = exchange.getResponse().getHeaders(); + HttpHeaders headers = HttpHeaders.writableHttpHeaders(exchange.getResponse().getHeaders()); labelHeaders(ex, headers::set); return Mono.error(ex); });