From 5e0fe4c2c790c363ed3d8a13508dddf2b2cc4480 Mon Sep 17 00:00:00 2001 From: hexiaofeng Date: Thu, 30 May 2024 08:32:25 +0800 Subject: [PATCH] Optimize the initialization logic to wait for service policies to be ready --- .../agent/core/service/ServiceSupervisor.java | 39 ++++++++++++++ .../live/agent/core/bootstrap/Bootstrap.java | 3 ++ .../agent/core/service/ServiceManager.java | 7 ++- .../governance/policy/PolicyManager.java | 54 ++++++++++++++++--- .../governance/policy/PolicySubscriber.java | 48 +++++++++++------ .../governance/service/PolicyService.java | 38 +++++++++++++ .../policy/file/LaneSpaceFileSyncer.java | 11 +++- .../policy/file/LiveSpaceFileSyncer.java | 11 +++- .../policy/file/ServiceFileSyncer.java | 16 ++++-- .../policy/multilive/LiveServiceSyncer.java | 18 ++++--- .../policy/multilive/LiveSpaceSyncer.java | 11 +++- 11 files changed, 216 insertions(+), 40 deletions(-) create mode 100644 joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/service/ServiceSupervisor.java create mode 100644 joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/service/PolicyService.java diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/service/ServiceSupervisor.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/service/ServiceSupervisor.java new file mode 100644 index 000000000..f6a96560a --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/service/ServiceSupervisor.java @@ -0,0 +1,39 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.service; + +import java.util.List; + +/** + * Defines an interface for supervising services within a system. + * The {@code ServiceSupervisor} is responsible for managing and providing information + * about agent services that are currently active or available within the system. + */ +public interface ServiceSupervisor { + + String COMPONENT_SERVICE_SUPERVISOR = "ServiceSupervisor"; + + /** + * Retrieves a list of {@code AgentService} instances that are currently managed + * or supervised by this supervisor. + * + * @return A list of {@code AgentService} instances representing the services + * currently under supervision. This list may be empty if no services + * are currently being supervised. + */ + List getServices(); +} + diff --git a/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/bootstrap/Bootstrap.java b/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/bootstrap/Bootstrap.java index 71fb4d9d5..cfe75a393 100644 --- a/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/bootstrap/Bootstrap.java +++ b/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/bootstrap/Bootstrap.java @@ -54,6 +54,7 @@ import com.jd.live.agent.core.plugin.PluginManager; import com.jd.live.agent.core.plugin.PluginSupervisor; import com.jd.live.agent.core.service.ServiceManager; +import com.jd.live.agent.core.service.ServiceSupervisor; import com.jd.live.agent.core.util.Close; import com.jd.live.agent.core.util.network.Ipv4; import com.jd.live.agent.core.util.option.CascadeOption; @@ -481,6 +482,8 @@ private Injector createInjector() { ctx.add(AgentPath.COMPONENT_AGENT_PATH, agentPath); ctx.add(Application.COMPONENT_APPLICATION, application); ctx.add(ExtensionManager.COMPONENT_EXTENSION_MANAGER, extensionManager); + // + ctx.add(ServiceSupervisor.COMPONENT_SERVICE_SUPERVISOR, (ServiceSupervisor) () -> serviceManager.getServices()); ctx.add(Timer.COMPONENT_TIMER, timer); ctx.add(EventBus.COMPONENT_EVENT_BUS, eventBus); ctx.add(Resourcer.COMPONENT_RESOURCER, classLoaderManager == null ? null : classLoaderManager.getPluginLoaders()); diff --git a/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/service/ServiceManager.java b/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/service/ServiceManager.java index f8acefe4d..9c138efc2 100644 --- a/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/service/ServiceManager.java +++ b/joylive-core/joylive-core-framework/src/main/java/com/jd/live/agent/core/service/ServiceManager.java @@ -38,7 +38,7 @@ * @since 1.0.0 */ @Injectable -public class ServiceManager implements AgentService { +public class ServiceManager implements AgentService, ServiceSupervisor { private static final Logger logger = LoggerFactory.getLogger(ServiceManager.class); @@ -55,6 +55,11 @@ public class ServiceManager implements AgentService { @InjectLoader(ResourcerType.CORE_IMPL) private List services; + @Override + public List getServices() { + return services; + } + @Override public CompletableFuture start() { return execute(AgentService::start, s -> "Service " + s.getClass().getSimpleName() + " is started.").whenComplete((v, t) -> { diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java index 2fa67f713..677668f1a 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java @@ -16,8 +16,6 @@ package com.jd.live.agent.governance.policy; import com.jd.live.agent.bootstrap.classloader.ResourcerType; -import com.jd.live.agent.bootstrap.logger.Logger; -import com.jd.live.agent.bootstrap.logger.LoggerFactory; import com.jd.live.agent.core.event.AgentEvent; import com.jd.live.agent.core.event.AgentEvent.EventType; import com.jd.live.agent.core.event.Event; @@ -32,6 +30,8 @@ import com.jd.live.agent.core.inject.annotation.Injectable; import com.jd.live.agent.core.instance.AppService; import com.jd.live.agent.core.instance.Application; +import com.jd.live.agent.core.service.AgentService; +import com.jd.live.agent.core.service.ServiceSupervisor; import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.config.LaneConfig; import com.jd.live.agent.governance.config.LiveConfig; @@ -46,6 +46,7 @@ import com.jd.live.agent.governance.policy.variable.UnitFunction; import com.jd.live.agent.governance.policy.variable.VariableFunction; import com.jd.live.agent.governance.policy.variable.VariableParser; +import com.jd.live.agent.governance.service.PolicyService; import lombok.Getter; import java.util.*; @@ -63,8 +64,9 @@ @Injectable @Extension(value = "PolicyManager", order = InjectSourceSupplier.ORDER_POLICY_MANAGER) public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, ExtensionInitializer, InvocationContext { - private static final Logger logger = LoggerFactory.getLogger(PolicyManager.class); + private final AtomicReference policy = new AtomicReference<>(); + private final Map subscribers = new ConcurrentHashMap<>(); @Getter @@ -138,7 +140,12 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex @InjectLoader(ResourcerType.CORE_IMPL) private List routeFilters; - private final AtomicBoolean warmuped = new AtomicBoolean(false); + @Inject(ServiceSupervisor.COMPONENT_SERVICE_SUPERVISOR) + private ServiceSupervisor serviceSupervisor; + + private List policyServices; + + private final AtomicBoolean warmup = new AtomicBoolean(false); @Override public PolicySupplier getPolicySupplier() { @@ -202,7 +209,7 @@ public CompletableFuture subscribe(String name, PolicyType type) { return CompletableFuture.completedFuture(null); } String namespace = application.getService() == null ? null : application.getService().getNamespace(); - PolicySubscriber subscriber = new PolicySubscriber(name, namespace, type); + PolicySubscriber subscriber = new PolicySubscriber(name, namespace, type, policyServices); subscribe(subscriber); return subscriber.getFuture(); } @@ -218,14 +225,42 @@ public void initialize() { for (Event event : events) { if (event.getData().getType() == EventType.AGENT_SERVICE_READY) { // subscribe after all services are started. + policyServices = computePolicyServices(); warmup(); } } }); } + /** + * Computes a list of policy service names by inspecting the available services from the service supervisor. + * Only services of type {@link PolicyService} with a policy type of {@link PolicyType#SERVICE_POLICY} are included. + * + * @return A list of policy service names that match the criteria. + */ + private List computePolicyServices() { + List result = new ArrayList<>(); + if (serviceSupervisor != null) { + List services = serviceSupervisor.getServices(); + if (services != null) { + for (AgentService service : services) { + if (service instanceof PolicyService) { + PolicyService policyService = (PolicyService) service; + if (policyService.getPolicyType() == PolicyType.SERVICE_POLICY) { + result.add(policyService.getName()); + } + } + } + } + } + return result; + } + + /** + * Initiates the warmup process. + */ private void warmup() { - if (warmuped.compareAndSet(false, true)) { + if (warmup.compareAndSet(false, true)) { ServiceConfig serviceConfig = governanceConfig == null ? null : governanceConfig.getServiceConfig(); Set warmups = serviceConfig == null ? null : serviceConfig.getWarmups(); warmups = warmups == null ? new HashSet<>() : warmups; @@ -236,11 +271,16 @@ private void warmup() { warmups.add(name); } if (!warmups.isEmpty()) { - warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY))); + warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY, policyServices))); } } } + /** + * Subscribes a {@link PolicySubscriber} to the policy publisher. + * + * @param subscriber The {@link PolicySubscriber} to be subscribed. + */ protected void subscribe(PolicySubscriber subscriber) { PolicySubscriber exist = subscribers.putIfAbsent(subscriber.getName(), subscriber); if (exist == null) { diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicySubscriber.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicySubscriber.java index 4fd814ee0..e27a72510 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicySubscriber.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicySubscriber.java @@ -17,8 +17,13 @@ import lombok.Getter; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.stream.Collectors; /** * Represents a subscriber to a policy, encapsulating the subscription details and providing mechanisms to track @@ -34,6 +39,10 @@ public class PolicySubscriber { private final PolicyType type; + private final Map states; + + private final AtomicInteger counter; + private final CompletableFuture future = new CompletableFuture<>(); /** @@ -41,44 +50,53 @@ public class PolicySubscriber { * * @param name The name of the subscriber. * @param namespace The namespace of the subscriber. - * @param type The type of policy the subscriber is interested in. + * @param type The type of the subscriber. + * @param owners The owner of the subscriber. */ - public PolicySubscriber(String name, String namespace, PolicyType type) { + public PolicySubscriber(String name, String namespace, PolicyType type, List owners) { this.name = name; this.namespace = namespace; this.type = type; + this.states = owners == null || owners.isEmpty() ? null + : owners.stream().collect(Collectors.toMap(o -> o, o -> new AtomicBoolean(false))); + this.counter = states == null ? null : new AtomicInteger(states.size()); } /** * Marks the subscription process as complete successfully. This method completes the associated future * normally, indicating that the subscription process has finished without errors. + * + * @param owner The owner whose subscription process is marked as complete. + * @return {@code true} if the completion was successful, {@code false} otherwise. */ - public boolean complete() { - if (!isDone()) { + public boolean complete(String owner) { + if (states == null) { return future.complete(null); } + AtomicBoolean done = owner == null ? null : states.get(owner); + if (done != null && done.compareAndSet(false, true)) { + if (counter.decrementAndGet() == 0) { + future.complete(null); + } + return true; + } return false; } - /** - * Checks if the subscription process is complete. - * - * @return {@code true} if the subscription process has completed (either normally or exceptionally), - * {@code false} otherwise. - */ - public boolean isDone() { - return future.isDone(); - } - /** * Marks the subscription process as complete with an exception. This method completes the associated future * exceptionally, indicating that the subscription process has finished due to an error. * * @param ex The exception to complete the future with, representing the error that occurred during the * subscription process. + * @return {@code true} if the future was completed exceptionally, {@code false} otherwise. */ public boolean completeExceptionally(Throwable ex) { - return future.completeExceptionally(ex); + boolean result = future.completeExceptionally(ex); + if (result && states != null) { + states.forEach((key, value) -> value.compareAndSet(false, true)); + } + return result; } /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/service/PolicyService.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/service/PolicyService.java new file mode 100644 index 000000000..1f4013e35 --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/service/PolicyService.java @@ -0,0 +1,38 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.service; + +import com.jd.live.agent.core.service.AgentService; +import com.jd.live.agent.governance.policy.PolicyType; + +/** + * Represents a service that deals with policies, extending the functionalities + * of an {@link AgentService}. This interface provides additional capabilities + * specifically related to policy management. + */ +public interface PolicyService extends AgentService { + + /** + * Retrieves the type of service this policy service represents. + * + * @return the {@link PolicyType} representing the type of this policy service. + */ + PolicyType getPolicyType(); + + String getName(); +} + + diff --git a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LaneSpaceFileSyncer.java b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LaneSpaceFileSyncer.java index 4e55e7dde..2ed83f3a1 100644 --- a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LaneSpaceFileSyncer.java +++ b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LaneSpaceFileSyncer.java @@ -32,7 +32,9 @@ import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.policy.GovernancePolicy; import com.jd.live.agent.governance.policy.PolicySupervisor; +import com.jd.live.agent.governance.policy.PolicyType; import com.jd.live.agent.governance.policy.lane.LaneSpace; +import com.jd.live.agent.governance.service.PolicyService; import java.io.InputStreamReader; import java.util.List; @@ -46,7 +48,7 @@ @Extension("LaneSpaceFileSyncer") @ConditionalOnProperty(name = SyncConfig.SYNC_LANE_SPACE_TYPE, value = "file") @ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true) -public class LaneSpaceFileSyncer extends AbstractFileSyncer> { +public class LaneSpaceFileSyncer extends AbstractFileSyncer> implements PolicyService { private static final Logger logger = LoggerFactory.getLogger(LaneSpaceFileSyncer.class); @@ -69,7 +71,12 @@ public LaneSpaceFileSyncer(PolicySupervisor policySupervisor, ObjectParser jsonP } @Override - protected String getName() { + public PolicyType getPolicyType() { + return PolicyType.LANE_SPACE; + } + + @Override + public String getName() { return "lane-syncer"; } diff --git a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LiveSpaceFileSyncer.java b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LiveSpaceFileSyncer.java index 6e48dec3f..47a63c6a0 100644 --- a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LiveSpaceFileSyncer.java +++ b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/LiveSpaceFileSyncer.java @@ -32,7 +32,9 @@ import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.policy.GovernancePolicy; import com.jd.live.agent.governance.policy.PolicySupervisor; +import com.jd.live.agent.governance.policy.PolicyType; import com.jd.live.agent.governance.policy.live.LiveSpace; +import com.jd.live.agent.governance.service.PolicyService; import java.io.InputStreamReader; import java.util.List; @@ -46,7 +48,7 @@ @Extension("LiveSpaceFileSyncer") @ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "file") @ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true) -public class LiveSpaceFileSyncer extends AbstractFileSyncer> { +public class LiveSpaceFileSyncer extends AbstractFileSyncer> implements PolicyService { private static final Logger logger = LoggerFactory.getLogger(LiveSpaceFileSyncer.class); @@ -69,7 +71,12 @@ public LiveSpaceFileSyncer(PolicySupervisor policySupervisor, ObjectParser jsonP } @Override - protected String getName() { + public PolicyType getPolicyType() { + return PolicyType.LIVE_SPACE; + } + + @Override + public String getName() { return "live-syncer"; } diff --git a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/ServiceFileSyncer.java b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/ServiceFileSyncer.java index ee21fa599..721e30188 100644 --- a/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/ServiceFileSyncer.java +++ b/joylive-implement/joylive-service/joylive-service-file/src/main/java/com/jd/live/agent/implement/service/policy/file/ServiceFileSyncer.java @@ -34,6 +34,7 @@ import com.jd.live.agent.governance.policy.PolicySupervisor; import com.jd.live.agent.governance.policy.PolicyType; import com.jd.live.agent.governance.policy.service.Service; +import com.jd.live.agent.governance.service.PolicyService; import com.jd.live.agent.implement.service.policy.file.config.ServiceSyncConfig; import java.io.InputStreamReader; @@ -48,7 +49,7 @@ @Injectable @Extension("ServiceFileSyncer") @ConditionalOnProperty(name = SyncConfig.SYNC_MICROSERVICE_TYPE, value = "file") -public class ServiceFileSyncer extends AbstractFileSyncer> { +public class ServiceFileSyncer extends AbstractFileSyncer> implements PolicyService { private static final Logger logger = LoggerFactory.getLogger(ServiceFileSyncer.class); @@ -72,7 +73,12 @@ public class ServiceFileSyncer extends AbstractFileSyncer> { private Map versions = new HashMap<>(); @Override - protected String getName() { + public PolicyType getPolicyType() { + return PolicyType.SERVICE_POLICY; + } + + @Override + public String getName() { return "service-syncer"; } @@ -146,7 +152,7 @@ private GovernancePolicy newPolicy(List updates, Set deletes, G private void onLoaded() { if (loaded.compareAndSet(false, true)) { for (PolicySubscriber subscriber : subscribers.values()) { - subscriber.complete(); + subscriber.complete(getName()); } } } @@ -162,11 +168,11 @@ private void subscribe(PolicySubscriber subscriber) { } else { PolicySubscriber old = subscribers.putIfAbsent(subscriber.getName(), subscriber); if (loaded.get()) { - subscriber.complete(); + subscriber.complete(getName()); } else if (old != null && old != subscriber) { old.trigger((v, t) -> { if (t == null) { - subscriber.complete(); + subscriber.complete(getName()); } else { subscriber.completeExceptionally(t); } diff --git a/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveServiceSyncer.java b/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveServiceSyncer.java index 9fc295f95..4ee730d42 100644 --- a/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveServiceSyncer.java +++ b/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveServiceSyncer.java @@ -45,6 +45,7 @@ import com.jd.live.agent.governance.policy.PolicySupervisor; import com.jd.live.agent.governance.policy.PolicyType; import com.jd.live.agent.governance.policy.service.Service; +import com.jd.live.agent.governance.service.PolicyService; import com.jd.live.agent.implement.service.policy.multilive.config.LiveSyncConfig; import com.jd.live.agent.implement.service.policy.multilive.reponse.Error; import com.jd.live.agent.implement.service.policy.multilive.reponse.Response; @@ -69,7 +70,7 @@ @ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "multilive") @ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_SERVICE, matchIfMissing = true) @ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true) -public class LiveServiceSyncer extends AbstractService implements ExtensionInitializer { +public class LiveServiceSyncer extends AbstractService implements PolicyService, ExtensionInitializer { private static final Logger logger = LoggerFactory.getLogger(LiveServiceSyncer.class); @@ -108,13 +109,18 @@ public class LiveServiceSyncer extends AbstractService implements ExtensionIniti private Template template; + @Override + public PolicyType getPolicyType() { + return PolicyType.SERVICE_POLICY; + } + @Override public void initialize() { template = new Template(syncConfig.getServiceUrl()); } @Override - protected String getName() { + public String getName() { return "live-service-syncer"; } @@ -195,7 +201,7 @@ private void syncAndUpdate(PolicySubscriber subscriber) { private void onOk(PolicySubscriber subscriber, Service service, ServiceSyncMeta meta) { if (update(subscriber.getName(), service)) { meta.version = service.getVersion(); - subscriber.complete(); + subscriber.complete(getName()); logger.info(meta.getSuccessMessage(HttpStatus.OK)); } } @@ -207,7 +213,7 @@ private void onOk(PolicySubscriber subscriber, Service service, ServiceSyncMeta * @param meta the service synchronization metadata. */ private void onNotModified(PolicySubscriber subscriber, ServiceSyncMeta meta) { - subscriber.complete(); + subscriber.complete(getName()); if (meta.shouldPrint()) { logger.info(meta.getSuccessMessage(HttpStatus.NOT_MODIFIED)); } @@ -224,10 +230,10 @@ private void onNotFound(PolicySubscriber subscriber, ServiceSyncMeta meta) { if (update(subscriber.getName(), null)) { // Retry from version 0 after data is recovered. meta.version = 0; - subscriber.complete(); + subscriber.complete(getName()); logger.info(meta.getSuccessMessage(HttpStatus.NOT_FOUND)); } - } else if (subscriber.complete()) { + } else if (subscriber.complete(getName())) { logger.info(meta.getSuccessMessage(HttpStatus.NOT_FOUND)); } else if (meta.shouldPrint()) { logger.info(meta.getSuccessMessage(HttpStatus.NOT_FOUND)); diff --git a/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveSpaceSyncer.java b/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveSpaceSyncer.java index 7ea4d8cb7..7f482dae6 100644 --- a/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveSpaceSyncer.java +++ b/joylive-implement/joylive-service/joylive-service-multilive/src/main/java/com/jd/live/agent/implement/service/policy/multilive/LiveSpaceSyncer.java @@ -38,7 +38,9 @@ import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.policy.GovernancePolicy; import com.jd.live.agent.governance.policy.PolicySupervisor; +import com.jd.live.agent.governance.policy.PolicyType; import com.jd.live.agent.governance.policy.live.LiveSpace; +import com.jd.live.agent.governance.service.PolicyService; import com.jd.live.agent.implement.service.policy.multilive.config.LiveSyncConfig; import com.jd.live.agent.implement.service.policy.multilive.reponse.Error; import com.jd.live.agent.implement.service.policy.multilive.reponse.Response; @@ -59,7 +61,7 @@ @Extension("LiveSpaceSyncer") @ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "multilive") @ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true) -public class LiveSpaceSyncer extends AbstractSyncer, Map> implements ExtensionInitializer { +public class LiveSpaceSyncer extends AbstractSyncer, Map> implements PolicyService, ExtensionInitializer { private static final Logger logger = LoggerFactory.getLogger(LiveSpaceSyncer.class); @@ -81,13 +83,18 @@ public class LiveSpaceSyncer extends AbstractSyncer, Map