diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedNodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedNodeSelectionStrategyChannel.java index 769f09795..6da500352 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedNodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedNodeSelectionStrategyChannel.java @@ -47,7 +47,7 @@ * workloads (where n requests must all land on the same server) or scenarios where cache warming is very important. * {@link PinUntilErrorNodeSelectionStrategyChannel} remains the best choice for these. */ -final class BalancedNodeSelectionStrategyChannel implements LimitedChannel { +final class BalancedNodeSelectionStrategyChannel implements NodeSelectingChannel { private static final Logger log = LoggerFactory.getLogger(BalancedNodeSelectionStrategyChannel.class); private static final int INFLIGHT_COMPARISON_THRESHOLD = 5; @@ -56,7 +56,7 @@ final class BalancedNodeSelectionStrategyChannel implements LimitedChannel { private static final int UNHEALTHY_SCORE_MULTIPLIER = 2; private final BalancedScoreTracker tracker; - private final ImmutableList channels; + private final ImmutableList channels; BalancedNodeSelectionStrategyChannel( ImmutableList channels, @@ -116,7 +116,7 @@ public Optional> maybeExecute( giveUpThreshold = newThreshold; } - BalancedChannel channel = channels.get(snapshot.getDelegate().channelIndex()); + LimitedChannel channel = channels.get(snapshot.getDelegate().channelIndex()); Optional> maybe = StickyAttachments.maybeExecute(channel, endpoint, request, limitEnforcement); if (maybe.isPresent()) { @@ -127,6 +127,11 @@ public Optional> maybeExecute( return Optional.empty(); } + @Override + public ImmutableList nodeChannels() { + return channels; + } + private static final class BalancedChannel implements LimitedChannel { private final LimitedChannel delegate; private final ChannelScoreInfo channelInfo; diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java index ccbcca07f..b25f1e095 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java @@ -130,6 +130,17 @@ void onSuccess(Response result, PermitControl control) { } } + @Override + void onFailure(Throwable _throwable, PermitControl control) { + control.ignore(); + } + }, + STICKY() { + @Override + void onSuccess(Response _result, PermitControl control) { + control.success(); + } + @Override void onFailure(Throwable _throwable, PermitControl control) { control.ignore(); @@ -157,6 +168,10 @@ final class Permit implements PermitControl, FutureCallback { this.inFlightSnapshot = inFlightSnapshot; } + boolean isOnlyInFlight() { + return inFlightSnapshot == 1; + } + @VisibleForTesting int inFlightSnapshot() { return inFlightSnapshot; diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultStickySessionFactory.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultStickySessionFactory.java new file mode 100644 index 000000000..1cc8a3350 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultStickySessionFactory.java @@ -0,0 +1,201 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.EndpointChannel; +import com.palantir.dialogue.EndpointChannelFactory; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import com.palantir.dialogue.core.StickyAttachments.StickyTarget; +import com.palantir.dialogue.futures.DialogueFutures; +import com.palantir.logsafe.Preconditions; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import org.immutables.value.Value; + +@Value.Enclosing +public final class DefaultStickySessionFactory implements StickySessionFactory { + + private final Supplier delegate; + + private DefaultStickySessionFactory(Supplier endpointChannelFactory) { + this.delegate = Preconditions.checkNotNull(endpointChannelFactory, "endpointChannelFactory"); + } + + @Override + public StickySession get() { + return new DefaultStickySession(delegate); + } + + @Override + public String toString() { + return "StickyEndpointChannels2{" + delegate + "}"; + } + + private static final class DefaultStickySession implements StickySession { + + private final Supplier channelFactory; + private final StickyRouter router = new StickyRouter(); + + private DefaultStickySession(Supplier channelFactory) { + this.channelFactory = channelFactory; + } + + @Override + public Channel get() { + return new Sticky(channelFactory.get(), router); + } + } + + private static final class Sticky implements EndpointChannelFactory, Channel { + + private final EndpointChannelFactory channelFactory; + private final StickyRouter router; + + private Sticky(EndpointChannelFactory channelFactory, StickyRouter router) { + this.router = router; + this.channelFactory = channelFactory; + } + + @Override + public EndpointChannel endpoint(Endpoint endpoint) { + return new StickyEndpointChannel(router, channelFactory.endpoint(endpoint)); + } + + /** + * . + * @deprecated prefer {@link #endpoint}, as it allows binding work upfront + */ + @Deprecated + @Override + public ListenableFuture execute(Endpoint endpoint, Request request) { + return endpoint(endpoint).execute(request); + } + + @Override + public String toString() { + return "Sticky{" + channelFactory + '}'; + } + } + + public static DefaultStickySessionFactory create(Supplier endpointChannelFactory) { + return new DefaultStickySessionFactory(endpointChannelFactory); + } + + private static final class StickyRouter { + + private final InFlightCallCallback callback = new InFlightCallCallback(); + + @Nullable + private volatile StickyTarget stickyTarget; + + @Nullable + @GuardedBy("this") + private volatile ListenableFuture callInFlight; + + public ListenableFuture execute(Request request, EndpointChannel endpointChannel) { + if (stickyTarget != null) { + return executeWithStickyTarget(stickyTarget, request, endpointChannel); + } + + synchronized (this) { + if (stickyTarget != null) { + return executeWithStickyTarget(stickyTarget, request, endpointChannel); + } + + ListenableFuture callInFlightSnapshot = callInFlight; + if (callInFlightSnapshot == null) { + ListenableFuture result = DialogueFutures.addDirectCallback( + executeWithStickyToken(request, endpointChannel), callback); + callInFlight = Futures.nonCancellationPropagating(result); + return result; + } else { + ListenableFuture result = callInFlightSnapshot; + result = DialogueFutures.transformAsync(result, _input -> execute(request, endpointChannel)); + result = DialogueFutures.catchingAllAsync(result, _throwable -> execute(request, endpointChannel)); + return result; + } + } + } + + private final class InFlightCallCallback implements FutureCallback { + + @Override + public void onSuccess(Response result) { + successfulCall(result); + } + + @Override + public void onFailure(Throwable _throwable) { + failed(); + } + } + + private synchronized void successfulCall(Response response) { + callInFlight = null; + if (stickyTarget == null) { + StickyTarget newStickyTarget = + response.attachments().getOrDefault(StickyAttachments.STICKY_TOKEN, null); + if (newStickyTarget != null) { + stickyTarget = newStickyTarget; + } + } + } + + private synchronized void failed() { + callInFlight = null; + } + + private static ListenableFuture executeWithStickyToken( + Request request, EndpointChannel endpointChannel) { + request.attachments().put(StickyAttachments.REQUEST_STICKY_TOKEN, Boolean.TRUE); + return endpointChannel.execute(request); + } + + private static ListenableFuture executeWithStickyTarget( + StickyTarget stickyTarget, Request request, EndpointChannel endpointChannel) { + request.attachments().put(StickyAttachments.STICKY, stickyTarget); + return endpointChannel.execute(request); + } + } + + private static final class StickyEndpointChannel implements EndpointChannel { + private final StickyRouter stickyRouter; + private final EndpointChannel delegate; + + StickyEndpointChannel(StickyRouter stickyRouter, EndpointChannel delegate) { + this.stickyRouter = stickyRouter; + this.delegate = delegate; + } + + @Override + public ListenableFuture execute(Request request) { + return stickyRouter.execute(request, delegate); + } + + @Override + public String toString() { + return "StickyEndpointChannel{delegate=" + delegate + '}'; + } + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index 409e92773..84f02dc47 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -33,14 +33,23 @@ import java.util.OptionalInt; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.IntStream; public final class DialogueChannel implements Channel, EndpointChannelFactory { private final EndpointChannelFactory delegate; private final Config cf; + private final StickySessionFactory stickySessionFactory; + private final ImmutableList perHostChannels; - private DialogueChannel(Config cf, EndpointChannelFactory delegate) { + private DialogueChannel( + Config cf, + EndpointChannelFactory delegate, + StickySessionFactory stickySessionFactory, + ImmutableList perHostChannels) { this.cf = cf; this.delegate = delegate; + this.stickySessionFactory = stickySessionFactory; + this.perHostChannels = perHostChannels; } @Override @@ -53,6 +62,14 @@ public EndpointChannel endpoint(Endpoint endpoint) { return delegate.endpoint(endpoint); } + public StickySessionFactory stickySessionFactory() { + return stickySessionFactory; + } + + public ImmutableList perHost() { + return perHostChannels; + } + public static Builder builder() { return new Builder(); } @@ -162,8 +179,10 @@ public DialogueChannel build() { } ImmutableList channels = perUriChannels.build(); - LimitedChannel nodeSelectionChannel = NodeSelectionStrategyChannel.create(cf, channels); - Channel queuedChannel = QueuedChannel.create(cf, nodeSelectionChannel); + NodeSelectingChannel nodeSelectionChannel = NodeSelectionStrategyChannel.create(cf, channels); + + Channel defaultQueuedChannel = QueuedChannel.create(cf, nodeSelectionChannel); + Channel queuedChannel = new QueueOverrideChannel(defaultQueuedChannel); EndpointChannelFactory channelFactory = endpoint -> { EndpointChannel channel = new EndpointChannelAdapter(endpoint, queuedChannel); @@ -179,6 +198,19 @@ public DialogueChannel build() { return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop }; + StickySessionFactory stickySessionFactory = DefaultStickySessionFactory.create(() -> { + LimitedChannel stickyLimitedChannel = + StickyConcurrencyLimitedChannel.createForQueueKey(nodeSelectionChannel, cf.channelName()); + Channel queueOverride = QueuedChannel.createForSticky(cf, stickyLimitedChannel); + return endpoint -> { + EndpointChannel endpointChannel = channelFactory.endpoint(endpoint); + return (EndpointChannel) request -> { + request.attachments().put(QueueAttachments.QUEUE_OVERRIDE, queueOverride); + return endpointChannel.execute(request); + }; + }; + }); + Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()) .create() .clientName(cf.channelName()) @@ -186,7 +218,35 @@ public DialogueChannel build() { .build(); createMeter.mark(); - return new DialogueChannel(cf, channelFactory); + ImmutableList perHostChannels = IntStream.range(0, channels.size()) + .mapToObj(index -> new ChannelAndFactory() { + + private final LimitedChannel stickyLimitedChannel = + StickyConcurrencyLimitedChannel.createForQueueKey( + nodeSelectionChannel, cf.channelName()); + private final Channel queueOverride = + QueuedChannel.createPerHost(cf, stickyLimitedChannel, index); + + @Override + public EndpointChannel endpoint(Endpoint endpoint) { + EndpointChannel endpointChannel = channelFactory.endpoint(endpoint); + return request -> { + request.attachments().put(QueueAttachments.QUEUE_OVERRIDE, queueOverride); + StickyAttachments.routeToChannel( + request, + nodeSelectionChannel.nodeChannels().get(index)); + return endpointChannel.execute(request); + }; + } + + @Override + public ListenableFuture execute(Endpoint endpoint, Request request) { + return endpoint(endpoint).execute(request); + } + }) + .collect(ImmutableList.toImmutableList()); + + return new DialogueChannel(cf, channelFactory, stickySessionFactory, perHostChannels); } /** Does *not* do any clever live-reloading. */ @@ -195,4 +255,6 @@ public Channel buildNonLiveReloading() { return build(); } } + + interface ChannelAndFactory extends Channel, EndpointChannelFactory {} } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectingChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectingChannel.java new file mode 100644 index 000000000..382409046 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectingChannel.java @@ -0,0 +1,24 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.collect.ImmutableList; + +interface NodeSelectingChannel extends LimitedChannel { + /** Returns a stable list of all known delegate nodes. */ + ImmutableList nodeChannels(); +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index 38de591b9..6fca60f37 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -36,7 +36,7 @@ import javax.annotation.Nullable; import org.immutables.value.Value; -final class NodeSelectionStrategyChannel implements LimitedChannel { +final class NodeSelectionStrategyChannel implements NodeSelectingChannel { private static final String NODE_SELECTION_HEADER = "Node-Selection-Strategy"; private final FutureCallback callback = new NodeSelectionCallback(); @@ -51,7 +51,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { private final ImmutableList channels; @SuppressWarnings("NullAway") - private final LimitedChannel delegate = + private final NodeSelectingChannel delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); @VisibleForTesting @@ -73,13 +73,27 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { this.nodeSelectionStrategy.set(createNodeSelectionChannel(null, initialStrategy)); } - static LimitedChannel create(Config cf, ImmutableList channels) { + static NodeSelectingChannel create(Config cf, ImmutableList channels) { if (channels.isEmpty()) { return new StickyChannelHandler(new ZeroUriNodeSelectionChannel(cf.channelName())); } if (channels.size() == 1) { - return new StickyChannelHandler(new StuckRequestHandler(channels.get(0))); + return new StickyChannelHandler(new StuckRequestHandler(new NodeSelectingChannel() { + + @Override + public ImmutableList nodeChannels() { + return channels; + } + + private LimitedChannel delegate = channels.get(0); + + @Override + public Optional> maybeExecute( + Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { + return delegate.maybeExecute(endpoint, request, limitEnforcement); + } + })); } return new NodeSelectionStrategyChannel( @@ -167,11 +181,16 @@ public String toString() { return "NodeSelectionStrategyChannel{" + nodeSelectionStrategy + '}'; } + @Override + public ImmutableList nodeChannels() { + return delegate.nodeChannels(); + } + @Value.Immutable interface NodeSelectionChannel { DialogueNodeSelectionStrategy strategy(); - LimitedChannel channel(); + NodeSelectingChannel channel(); class Builder extends ImmutableNodeSelectionChannel.Builder {} @@ -214,11 +233,11 @@ private void consumeStrategy(String strategy) { } } - private static final class StickyChannelHandler implements LimitedChannel { + private static final class StickyChannelHandler implements NodeSelectingChannel { - private final LimitedChannel delegate; + private final NodeSelectingChannel delegate; - StickyChannelHandler(LimitedChannel delegate) { + StickyChannelHandler(NodeSelectingChannel delegate) { this.delegate = delegate; } @@ -236,13 +255,18 @@ public Optional> maybeExecute( Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { return maybeExecute(delegate, endpoint, request, limitEnforcement); } + + @Override + public ImmutableList nodeChannels() { + return delegate.nodeChannels(); + } } - private static final class StuckRequestHandler implements LimitedChannel { + private static final class StuckRequestHandler implements NodeSelectingChannel { - private final LimitedChannel delegate; + private final NodeSelectingChannel delegate; - StuckRequestHandler(LimitedChannel delegate) { + StuckRequestHandler(NodeSelectingChannel delegate) { this.delegate = delegate; } @@ -251,5 +275,10 @@ public Optional> maybeExecute( Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { return StickyAttachments.maybeExecute(delegate, endpoint, request, limitEnforcement); } + + @Override + public ImmutableList nodeChannels() { + return delegate.nodeChannels(); + } } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java index 253bce638..0befc3e98 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java @@ -58,19 +58,25 @@ * * To alleviate the second downside, we reshuffle all nodes every 10 minutes. */ -final class PinUntilErrorNodeSelectionStrategyChannel implements LimitedChannel { +final class PinUntilErrorNodeSelectionStrategyChannel implements NodeSelectingChannel { private static final Logger log = LoggerFactory.getLogger(PinUntilErrorNodeSelectionStrategyChannel.class); // we also add some jitter to ensure that there isn't a big spike of reshuffling every 10 minutes. private static final Duration RESHUFFLE_EVERY = Duration.ofMinutes(10); + private final ImmutableList originalChannels; private final AtomicInteger currentPin; private final NodeList nodeList; private final Instrumentation instrumentation; @VisibleForTesting PinUntilErrorNodeSelectionStrategyChannel( - NodeList nodeList, int initialPin, DialoguePinuntilerrorMetrics metrics, String channelName) { + ImmutableList originalChannels, + NodeList nodeList, + int initialPin, + DialoguePinuntilerrorMetrics metrics, + String channelName) { + this.originalChannels = originalChannels; this.nodeList = nodeList; this.currentPin = new AtomicInteger(initialPin); this.instrumentation = new Instrumentation(nodeList.size(), metrics, channelName); @@ -87,13 +93,13 @@ final class PinUntilErrorNodeSelectionStrategyChannel implements LimitedChannel static PinUntilErrorNodeSelectionStrategyChannel of( Optional initialChannel, DialogueNodeSelectionStrategy strategy, - List channels, + ImmutableList channels, DialoguePinuntilerrorMetrics metrics, Random random, Ticker ticker, String channelName) { // We preserve the 'stableIndex' so that calls can be attributed to one host even across reshuffles - List pinChannels = IntStream.range(0, channels.size()) + ImmutableList pinChannels = IntStream.range(0, channels.size()) .mapToObj(index -> ImmutablePinChannel.builder() .delegate(channels.get(index)) .stableIndex(index) @@ -114,10 +120,10 @@ static PinUntilErrorNodeSelectionStrategyChannel of( if (strategy == DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR) { NodeList shuffling = ReshufflingNodeList.of(initialShuffle, random, ticker, metrics, channelName); - return new PinUntilErrorNodeSelectionStrategyChannel(shuffling, initialPin, metrics, channelName); + return new PinUntilErrorNodeSelectionStrategyChannel(channels, shuffling, initialPin, metrics, channelName); } else if (strategy == DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE) { NodeList constant = new ConstantNodeList(initialShuffle); - return new PinUntilErrorNodeSelectionStrategyChannel(constant, initialPin, metrics, channelName); + return new PinUntilErrorNodeSelectionStrategyChannel(channels, constant, initialPin, metrics, channelName); } throw new SafeIllegalArgumentException("Unsupported NodeSelectionStrategy", SafeArg.of("strategy", strategy)); @@ -176,6 +182,11 @@ private OptionalInt incrementHostIfNecessary(int pin) { return saved ? OptionalInt.of(nextIndex) : OptionalInt.empty(); // we've moved on already } + @Override + public ImmutableList nodeChannels() { + return originalChannels; + } + interface NodeList { PinChannel get(int index); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueAttachments.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueAttachments.java new file mode 100644 index 000000000..5d38630d9 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueAttachments.java @@ -0,0 +1,24 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.RequestAttachmentKey; + +interface QueueAttachments { + RequestAttachmentKey QUEUE_OVERRIDE = RequestAttachmentKey.create(Channel.class); +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueOverrideChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueOverrideChannel.java new file mode 100644 index 000000000..cb441f1a4 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueueOverrideChannel.java @@ -0,0 +1,42 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; + +final class QueueOverrideChannel implements Channel { + + private final Channel defaultDelegate; + + QueueOverrideChannel(Channel defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } + + @Override + public ListenableFuture execute(Endpoint endpoint, Request request) { + Channel override = request.attachments().getOrDefault(QueueAttachments.QUEUE_OVERRIDE, null); + if (override != null) { + return override.execute(endpoint, request); + } else { + return defaultDelegate.execute(endpoint, request); + } + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java index b0ad3144b..7cd380389 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java @@ -92,6 +92,25 @@ final class QueuedChannel implements Channel { "Unable to make a request (queue is full)", SafeArg.of("maxQueueSize", maxQueueSize))); } + // TODO(12345): The metrics will be global, even though maxSize is per-queue. + static QueuedChannel createForSticky(Config cf, LimitedChannel delegate) { + return new QueuedChannel( + delegate, + cf.channelName(), + stickyInstrumentation( + DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName()), + cf.maxQueueSize()); + } + + static QueuedChannel createPerHost(Config cf, LimitedChannel delegate, int hostIndex) { + return new QueuedChannel( + delegate, + cf.channelName(), + perHostInstrumentation( + DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName(), hostIndex), + cf.maxQueueSize()); + } + static QueuedChannel create(Config cf, LimitedChannel delegate) { return new QueuedChannel( delegate, @@ -366,6 +385,42 @@ public Timer requestQueuedTime() { }; } + static QueuedChannelInstrumentation stickyInstrumentation(DialogueClientMetrics metrics, String channelName) { + return new QueuedChannelInstrumentation() { + @Override + public Counter requestsQueued() { + return metrics.requestsStickyQueued(channelName); + } + + @Override + public Timer requestQueuedTime() { + return metrics.requestStickyQueuedTime(channelName); + } + }; + } + + static QueuedChannelInstrumentation perHostInstrumentation( + DialogueClientMetrics metrics, String channelName, int hostIndex) { + String hostIndexString = Integer.toString(hostIndex); + return new QueuedChannelInstrumentation() { + @Override + public Counter requestsQueued() { + return metrics.requestsPerhostQueued() + .channelName(channelName) + .hostIndex(hostIndexString) + .build(); + } + + @Override + public Timer requestQueuedTime() { + return metrics.requestPerhostQueuedTime() + .channelName(channelName) + .hostIndex(hostIndexString) + .build(); + } + }; + } + static QueuedChannelInstrumentation endpointInstrumentation( DialogueClientMetrics metrics, String channelName, String service, String endpoint) { return new QueuedChannelInstrumentation() { diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyAttachments.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyAttachments.java index 96dbb0621..50225e3e3 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyAttachments.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyAttachments.java @@ -54,6 +54,10 @@ Optional> maybeExecute( Endpoint endpoint, Request request, LimitEnforcement limitEnforcement); } + static void routeToChannel(Request request, LimitedChannel limitedChannel) { + request.attachments().put(STICKY, limitedChannel::maybeExecute); + } + @CheckReturnValue static Optional> maybeExecute( LimitedChannel channel, Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyConcurrencyLimitedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyConcurrencyLimitedChannel.java new file mode 100644 index 000000000..756ce8893 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickyConcurrencyLimitedChannel.java @@ -0,0 +1,104 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Behavior; +import com.palantir.dialogue.futures.DialogueFutures; +import com.palantir.logsafe.SafeArg; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class StickyConcurrencyLimitedChannel implements LimitedChannel { + + private static final Logger log = LoggerFactory.getLogger(StickyConcurrencyLimitedChannel.class); + + private final LimitedChannel delegate; + private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter; + private final String channelNameForLogging; + + StickyConcurrencyLimitedChannel(LimitedChannel delegate, String channelNameForLogging) { + this.delegate = delegate; + this.limiter = new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.STICKY); + this.channelNameForLogging = channelNameForLogging; + } + + @Override + public Optional> maybeExecute( + Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { + Optional maybePermit = + limiter.acquire(limitEnforcement); + if (maybePermit.isPresent()) { + CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit permit = maybePermit.get(); + logPermitAcquired(); + + // This is a trade-off to solve an edge case where the first request on this channel could get + // concurrency limited: we would then have to rely on another request coming in to try scheduling the + // first request again. If that request never came, we would lock up. + // To combat that we have a way to instruct downstream channel to ignore its capacity limits, + // and let the request through. + Optional> result = delegate.maybeExecute( + endpoint, + request, + permit.isOnlyInFlight() + ? LimitEnforcement.DANGEROUS_BYPASS_LIMITS + : LimitEnforcement.DEFAULT_ENABLED); + if (result.isPresent()) { + DialogueFutures.addDirectCallback(result.get(), permit); + return result; + } else { + maybePermit.get().dropped(); + return Optional.empty(); + } + } else { + logPermitRefused(); + return Optional.empty(); + } + } + + static LimitedChannel createForQueueKey(LimitedChannel channel, String channelName) { + return new StickyConcurrencyLimitedChannel(channel, channelName); + } + + private void logPermitAcquired() { + if (log.isDebugEnabled()) { + log.debug( + "Sending {}/{} on {}", + SafeArg.of("inflight", limiter.getInflight()), + SafeArg.of("max", limiter.getLimit()), + SafeArg.of("channel", channelNameForLogging)); + } + } + + private void logPermitRefused() { + if (log.isDebugEnabled()) { + log.debug( + "Limited {} on {}", + SafeArg.of("max", limiter.getLimit()), + SafeArg.of("channel", channelNameForLogging)); + } + } + + @Override + public String toString() { + return "StickyConcurrencyLimitedChannel{delegate=" + delegate + ", name=" + channelNameForLogging + '}'; + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/StickySessionFactory.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickySessionFactory.java new file mode 100644 index 000000000..7596960d8 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/StickySessionFactory.java @@ -0,0 +1,26 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.core.StickySessionFactory.StickySession; +import java.util.function.Supplier; + +interface StickySessionFactory extends Supplier { + + interface StickySession extends Supplier {} +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java index 0938390e7..615393c14 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java @@ -23,17 +23,23 @@ import java.util.Optional; import java.util.function.Supplier; -final class SupplierChannel implements LimitedChannel { - private final Supplier channelSupplier; +final class SupplierChannel implements NodeSelectingChannel { + private final Supplier channelSupplier; - SupplierChannel(Supplier channelSupplier) { + SupplierChannel(Supplier channelSupplier) { this.channelSupplier = channelSupplier; } @Override public Optional> maybeExecute( Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { - LimitedChannel delegate = channelSupplier.get(); + NodeSelectingChannel delegate = channelSupplier.get(); return delegate.maybeExecute(endpoint, request, limitEnforcement); } + + @Override + public void routeToHost(int index, Request request) { + NodeSelectingChannel delegate = channelSupplier.get(); + delegate.routeToHost(index, request); + } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/ZeroUriNodeSelectionChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/ZeroUriNodeSelectionChannel.java index b785b42d7..04275cec4 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/ZeroUriNodeSelectionChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/ZeroUriNodeSelectionChannel.java @@ -27,7 +27,7 @@ import java.util.Optional; /** When we have zero URIs, no request can get out the door. */ -final class ZeroUriNodeSelectionChannel implements LimitedChannel { +final class ZeroUriNodeSelectionChannel implements NodeSelectingChannel { private final String channelName; ZeroUriNodeSelectionChannel(String channelName) { @@ -43,4 +43,10 @@ public Optional> maybeExecute( SafeArg.of("service", endpoint.serviceName()), SafeArg.of("endpoint", endpoint.endpointName())))); } + + @Override + public void routeToHost(int _index, Request _request) { + throw new SafeIllegalStateException( + "There are no URIs configured to handle requests", SafeArg.of("channel", channelName)); + } } diff --git a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml index b7a05adf1..1415c81ee 100644 --- a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml +++ b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml @@ -40,6 +40,14 @@ namespaces: type: counter tags: [channel-name, service-name, endpoint] docs: Number of queued requests waiting to execute for a specific endpoint due to server QoS. + requests.sticky.queued: + type: counter + tags: [ channel-name ] + docs: Number of sticky queued requests waiting to try to be executed. + requests.perhost.queued: + type: counter + tags: [ channel-name, hostIndex ] + docs: Number of per-host queued requests waiting to try to be executed. request.queued.time: type: timer tags: [channel-name] @@ -48,6 +56,14 @@ namespaces: type: timer tags: [channel-name, service-name, endpoint] docs: Time spent waiting in the queue before execution on a specific endpoint due to server QoS. + request.sticky.queued.time: + type: timer + tags: [ channel-name ] + docs: Time spent waiting in the sticky queue before execution attempt. + request.perhost.queued.time: + type: timer + tags: [ channel-name, hostIndex ] + docs: Time spent waiting in the per-host queue before execution attempt. # Note: the 'dialogue.client.create' metric is also defined in the apache metrics. create: type: meter diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java index c619ed120..9c3a70ab2 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java @@ -76,8 +76,8 @@ public void before() { channels, pseudo, clock, metrics, channelName); pinUntilErrorWithoutReshuffle = - new PinUntilErrorNodeSelectionStrategyChannel(constantList, 1, metrics, channelName); - pinUntilError = new PinUntilErrorNodeSelectionStrategyChannel(shufflingList, 1, metrics, channelName); + new PinUntilErrorNodeSelectionStrategyChannel(null, constantList, 1, metrics, channelName); + pinUntilError = new PinUntilErrorNodeSelectionStrategyChannel(null, shufflingList, 1, metrics, channelName); } @Test