Skip to content

Commit

Permalink
Optimize the initialization logic to wait for service policies to be …
Browse files Browse the repository at this point in the history
…ready
  • Loading branch information
hexiaofeng committed May 30, 2024
1 parent e53aba4 commit 5e0fe4c
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.service;

import java.util.List;

/**
* Defines an interface for supervising services within a system.
* The {@code ServiceSupervisor} is responsible for managing and providing information
* about agent services that are currently active or available within the system.
*/
public interface ServiceSupervisor {

String COMPONENT_SERVICE_SUPERVISOR = "ServiceSupervisor";

/**
* Retrieves a list of {@code AgentService} instances that are currently managed
* or supervised by this supervisor.
*
* @return A list of {@code AgentService} instances representing the services
* currently under supervision. This list may be empty if no services
* are currently being supervised.
*/
List<AgentService> getServices();
}

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.jd.live.agent.core.plugin.PluginManager;
import com.jd.live.agent.core.plugin.PluginSupervisor;
import com.jd.live.agent.core.service.ServiceManager;
import com.jd.live.agent.core.service.ServiceSupervisor;
import com.jd.live.agent.core.util.Close;
import com.jd.live.agent.core.util.network.Ipv4;
import com.jd.live.agent.core.util.option.CascadeOption;
Expand Down Expand Up @@ -481,6 +482,8 @@ private Injector createInjector() {
ctx.add(AgentPath.COMPONENT_AGENT_PATH, agentPath);
ctx.add(Application.COMPONENT_APPLICATION, application);
ctx.add(ExtensionManager.COMPONENT_EXTENSION_MANAGER, extensionManager);
//
ctx.add(ServiceSupervisor.COMPONENT_SERVICE_SUPERVISOR, (ServiceSupervisor) () -> serviceManager.getServices());
ctx.add(Timer.COMPONENT_TIMER, timer);
ctx.add(EventBus.COMPONENT_EVENT_BUS, eventBus);
ctx.add(Resourcer.COMPONENT_RESOURCER, classLoaderManager == null ? null : classLoaderManager.getPluginLoaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* @since 1.0.0
*/
@Injectable
public class ServiceManager implements AgentService {
public class ServiceManager implements AgentService, ServiceSupervisor {

private static final Logger logger = LoggerFactory.getLogger(ServiceManager.class);

Expand All @@ -55,6 +55,11 @@ public class ServiceManager implements AgentService {
@InjectLoader(ResourcerType.CORE_IMPL)
private List<AgentService> services;

@Override
public List<AgentService> getServices() {
return services;
}

@Override
public CompletableFuture<Void> start() {
return execute(AgentService::start, s -> "Service " + s.getClass().getSimpleName() + " is started.").whenComplete((v, t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package com.jd.live.agent.governance.policy;

import com.jd.live.agent.bootstrap.classloader.ResourcerType;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.event.AgentEvent;
import com.jd.live.agent.core.event.AgentEvent.EventType;
import com.jd.live.agent.core.event.Event;
Expand All @@ -32,6 +30,8 @@
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.instance.AppService;
import com.jd.live.agent.core.instance.Application;
import com.jd.live.agent.core.service.AgentService;
import com.jd.live.agent.core.service.ServiceSupervisor;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.config.LaneConfig;
import com.jd.live.agent.governance.config.LiveConfig;
Expand All @@ -46,6 +46,7 @@
import com.jd.live.agent.governance.policy.variable.UnitFunction;
import com.jd.live.agent.governance.policy.variable.VariableFunction;
import com.jd.live.agent.governance.policy.variable.VariableParser;
import com.jd.live.agent.governance.service.PolicyService;
import lombok.Getter;

import java.util.*;
Expand All @@ -63,8 +64,9 @@
@Injectable
@Extension(value = "PolicyManager", order = InjectSourceSupplier.ORDER_POLICY_MANAGER)
public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, ExtensionInitializer, InvocationContext {
private static final Logger logger = LoggerFactory.getLogger(PolicyManager.class);

private final AtomicReference<GovernancePolicy> policy = new AtomicReference<>();

private final Map<String, PolicySubscriber> subscribers = new ConcurrentHashMap<>();

@Getter
Expand Down Expand Up @@ -138,7 +140,12 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex
@InjectLoader(ResourcerType.CORE_IMPL)
private List<RouteFilter> routeFilters;

private final AtomicBoolean warmuped = new AtomicBoolean(false);
@Inject(ServiceSupervisor.COMPONENT_SERVICE_SUPERVISOR)
private ServiceSupervisor serviceSupervisor;

private List<String> policyServices;

private final AtomicBoolean warmup = new AtomicBoolean(false);

@Override
public PolicySupplier getPolicySupplier() {
Expand Down Expand Up @@ -202,7 +209,7 @@ public CompletableFuture<Void> subscribe(String name, PolicyType type) {
return CompletableFuture.completedFuture(null);
}
String namespace = application.getService() == null ? null : application.getService().getNamespace();
PolicySubscriber subscriber = new PolicySubscriber(name, namespace, type);
PolicySubscriber subscriber = new PolicySubscriber(name, namespace, type, policyServices);
subscribe(subscriber);
return subscriber.getFuture();
}
Expand All @@ -218,14 +225,42 @@ public void initialize() {
for (Event<AgentEvent> event : events) {
if (event.getData().getType() == EventType.AGENT_SERVICE_READY) {
// subscribe after all services are started.
policyServices = computePolicyServices();
warmup();
}
}
});
}

/**
* Computes a list of policy service names by inspecting the available services from the service supervisor.
* Only services of type {@link PolicyService} with a policy type of {@link PolicyType#SERVICE_POLICY} are included.
*
* @return A list of policy service names that match the criteria.
*/
private List<String> computePolicyServices() {
List<String> result = new ArrayList<>();
if (serviceSupervisor != null) {
List<AgentService> services = serviceSupervisor.getServices();
if (services != null) {
for (AgentService service : services) {
if (service instanceof PolicyService) {
PolicyService policyService = (PolicyService) service;
if (policyService.getPolicyType() == PolicyType.SERVICE_POLICY) {
result.add(policyService.getName());
}
}
}
}
}
return result;
}

/**
* Initiates the warmup process.
*/
private void warmup() {
if (warmuped.compareAndSet(false, true)) {
if (warmup.compareAndSet(false, true)) {
ServiceConfig serviceConfig = governanceConfig == null ? null : governanceConfig.getServiceConfig();
Set<String> warmups = serviceConfig == null ? null : serviceConfig.getWarmups();
warmups = warmups == null ? new HashSet<>() : warmups;
Expand All @@ -236,11 +271,16 @@ private void warmup() {
warmups.add(name);
}
if (!warmups.isEmpty()) {
warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY)));
warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY, policyServices)));
}
}
}

/**
* Subscribes a {@link PolicySubscriber} to the policy publisher.
*
* @param subscriber The {@link PolicySubscriber} to be subscribed.
*/
protected void subscribe(PolicySubscriber subscriber) {
PolicySubscriber exist = subscribers.putIfAbsent(subscriber.getName(), subscriber);
if (exist == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

import lombok.Getter;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
* Represents a subscriber to a policy, encapsulating the subscription details and providing mechanisms to track
Expand All @@ -34,51 +39,64 @@ public class PolicySubscriber {

private final PolicyType type;

private final Map<String, AtomicBoolean> states;

private final AtomicInteger counter;

private final CompletableFuture<Void> future = new CompletableFuture<>();

/**
* Constructs a new instance of a policy subscriber with the specified name, namespace, and policy type.
*
* @param name The name of the subscriber.
* @param namespace The namespace of the subscriber.
* @param type The type of policy the subscriber is interested in.
* @param type The type of the subscriber.
* @param owners The owner of the subscriber.
*/
public PolicySubscriber(String name, String namespace, PolicyType type) {
public PolicySubscriber(String name, String namespace, PolicyType type, List<String> owners) {
this.name = name;
this.namespace = namespace;
this.type = type;
this.states = owners == null || owners.isEmpty() ? null
: owners.stream().collect(Collectors.toMap(o -> o, o -> new AtomicBoolean(false)));
this.counter = states == null ? null : new AtomicInteger(states.size());
}

/**
* Marks the subscription process as complete successfully. This method completes the associated future
* normally, indicating that the subscription process has finished without errors.
*
* @param owner The owner whose subscription process is marked as complete.
* @return {@code true} if the completion was successful, {@code false} otherwise.
*/
public boolean complete() {
if (!isDone()) {
public boolean complete(String owner) {
if (states == null) {
return future.complete(null);
}
AtomicBoolean done = owner == null ? null : states.get(owner);
if (done != null && done.compareAndSet(false, true)) {
if (counter.decrementAndGet() == 0) {
future.complete(null);
}
return true;
}
return false;
}

/**
* Checks if the subscription process is complete.
*
* @return {@code true} if the subscription process has completed (either normally or exceptionally),
* {@code false} otherwise.
*/
public boolean isDone() {
return future.isDone();
}

/**
* Marks the subscription process as complete with an exception. This method completes the associated future
* exceptionally, indicating that the subscription process has finished due to an error.
*
* @param ex The exception to complete the future with, representing the error that occurred during the
* subscription process.
* @return {@code true} if the future was completed exceptionally, {@code false} otherwise.
*/
public boolean completeExceptionally(Throwable ex) {
return future.completeExceptionally(ex);
boolean result = future.completeExceptionally(ex);
if (result && states != null) {
states.forEach((key, value) -> value.compareAndSet(false, true));
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.service;

import com.jd.live.agent.core.service.AgentService;
import com.jd.live.agent.governance.policy.PolicyType;

/**
* Represents a service that deals with policies, extending the functionalities
* of an {@link AgentService}. This interface provides additional capabilities
* specifically related to policy management.
*/
public interface PolicyService extends AgentService {

/**
* Retrieves the type of service this policy service represents.
*
* @return the {@link PolicyType} representing the type of this policy service.
*/
PolicyType getPolicyType();

String getName();
}


Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.policy.GovernancePolicy;
import com.jd.live.agent.governance.policy.PolicySupervisor;
import com.jd.live.agent.governance.policy.PolicyType;
import com.jd.live.agent.governance.policy.lane.LaneSpace;
import com.jd.live.agent.governance.service.PolicyService;

import java.io.InputStreamReader;
import java.util.List;
Expand All @@ -46,7 +48,7 @@
@Extension("LaneSpaceFileSyncer")
@ConditionalOnProperty(name = SyncConfig.SYNC_LANE_SPACE_TYPE, value = "file")
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true)
public class LaneSpaceFileSyncer extends AbstractFileSyncer<List<LaneSpace>> {
public class LaneSpaceFileSyncer extends AbstractFileSyncer<List<LaneSpace>> implements PolicyService {

private static final Logger logger = LoggerFactory.getLogger(LaneSpaceFileSyncer.class);

Expand All @@ -69,7 +71,12 @@ public LaneSpaceFileSyncer(PolicySupervisor policySupervisor, ObjectParser jsonP
}

@Override
protected String getName() {
public PolicyType getPolicyType() {
return PolicyType.LANE_SPACE;
}

@Override
public String getName() {
return "lane-syncer";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.policy.GovernancePolicy;
import com.jd.live.agent.governance.policy.PolicySupervisor;
import com.jd.live.agent.governance.policy.PolicyType;
import com.jd.live.agent.governance.policy.live.LiveSpace;
import com.jd.live.agent.governance.service.PolicyService;

import java.io.InputStreamReader;
import java.util.List;
Expand All @@ -46,7 +48,7 @@
@Extension("LiveSpaceFileSyncer")
@ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "file")
@ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true)
public class LiveSpaceFileSyncer extends AbstractFileSyncer<List<LiveSpace>> {
public class LiveSpaceFileSyncer extends AbstractFileSyncer<List<LiveSpace>> implements PolicyService {

private static final Logger logger = LoggerFactory.getLogger(LiveSpaceFileSyncer.class);

Expand All @@ -69,7 +71,12 @@ public LiveSpaceFileSyncer(PolicySupervisor policySupervisor, ObjectParser jsonP
}

@Override
protected String getName() {
public PolicyType getPolicyType() {
return PolicyType.LIVE_SPACE;
}

@Override
public String getName() {
return "live-syncer";
}

Expand Down
Loading

0 comments on commit 5e0fe4c

Please sign in to comment.