Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per host channels #1333

Draft
wants to merge 5 commits into
base: jakubk/stick-queue
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* workloads (where n requests must all land on the same server) or scenarios where cache warming is very important.
* {@link PinUntilErrorNodeSelectionStrategyChannel} remains the best choice for these.
*/
final class BalancedNodeSelectionStrategyChannel implements LimitedChannel {
final class BalancedNodeSelectionStrategyChannel implements NodeSelectingChannel {
private static final Logger log = LoggerFactory.getLogger(BalancedNodeSelectionStrategyChannel.class);

private static final int INFLIGHT_COMPARISON_THRESHOLD = 5;
Expand All @@ -56,7 +56,7 @@ final class BalancedNodeSelectionStrategyChannel implements LimitedChannel {
private static final int UNHEALTHY_SCORE_MULTIPLIER = 2;

private final BalancedScoreTracker tracker;
private final ImmutableList<BalancedChannel> channels;
private final ImmutableList<LimitedChannel> channels;

BalancedNodeSelectionStrategyChannel(
ImmutableList<LimitedChannel> channels,
Expand Down Expand Up @@ -116,7 +116,7 @@ public Optional<ListenableFuture<Response>> maybeExecute(
giveUpThreshold = newThreshold;
}

BalancedChannel channel = channels.get(snapshot.getDelegate().channelIndex());
LimitedChannel channel = channels.get(snapshot.getDelegate().channelIndex());
Optional<ListenableFuture<Response>> maybe =
StickyAttachments.maybeExecute(channel, endpoint, request, limitEnforcement);
if (maybe.isPresent()) {
Expand All @@ -127,6 +127,11 @@ public Optional<ListenableFuture<Response>> maybeExecute(
return Optional.empty();
}

@Override
public ImmutableList<LimitedChannel> nodeChannels() {
return channels;
}

private static final class BalancedChannel implements LimitedChannel {
private final LimitedChannel delegate;
private final ChannelScoreInfo channelInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ void onSuccess(Response result, PermitControl control) {
}
}

@Override
void onFailure(Throwable _throwable, PermitControl control) {
control.ignore();
}
},
STICKY() {
@Override
void onSuccess(Response _result, PermitControl control) {
control.success();
}

@Override
void onFailure(Throwable _throwable, PermitControl control) {
control.ignore();
Expand Down Expand Up @@ -157,6 +168,10 @@ final class Permit implements PermitControl, FutureCallback<Response> {
this.inFlightSnapshot = inFlightSnapshot;
}

boolean isOnlyInFlight() {
return inFlightSnapshot == 1;
}

@VisibleForTesting
int inFlightSnapshot() {
return inFlightSnapshot;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.EndpointChannel;
import com.palantir.dialogue.EndpointChannelFactory;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.core.StickyAttachments.StickyTarget;
import com.palantir.dialogue.futures.DialogueFutures;
import com.palantir.logsafe.Preconditions;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.immutables.value.Value;

@Value.Enclosing
public final class DefaultStickySessionFactory implements StickySessionFactory {

private final Supplier<EndpointChannelFactory> delegate;

private DefaultStickySessionFactory(Supplier<EndpointChannelFactory> endpointChannelFactory) {
this.delegate = Preconditions.checkNotNull(endpointChannelFactory, "endpointChannelFactory");
}

@Override
public StickySession get() {
return new DefaultStickySession(delegate);
}

@Override
public String toString() {
return "StickyEndpointChannels2{" + delegate + "}";
}

private static final class DefaultStickySession implements StickySession {

private final Supplier<EndpointChannelFactory> channelFactory;
private final StickyRouter router = new StickyRouter();

private DefaultStickySession(Supplier<EndpointChannelFactory> channelFactory) {
this.channelFactory = channelFactory;
}

@Override
public Channel get() {
return new Sticky(channelFactory.get(), router);
}
}

private static final class Sticky implements EndpointChannelFactory, Channel {

private final EndpointChannelFactory channelFactory;
private final StickyRouter router;

private Sticky(EndpointChannelFactory channelFactory, StickyRouter router) {
this.router = router;
this.channelFactory = channelFactory;
}

@Override
public EndpointChannel endpoint(Endpoint endpoint) {
return new StickyEndpointChannel(router, channelFactory.endpoint(endpoint));
}

/**
* .
* @deprecated prefer {@link #endpoint}, as it allows binding work upfront
*/
@Deprecated
@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return endpoint(endpoint).execute(request);
}

@Override
public String toString() {
return "Sticky{" + channelFactory + '}';
}
}

public static DefaultStickySessionFactory create(Supplier<EndpointChannelFactory> endpointChannelFactory) {
return new DefaultStickySessionFactory(endpointChannelFactory);
}

private static final class StickyRouter {

private final InFlightCallCallback callback = new InFlightCallCallback();

@Nullable
private volatile StickyTarget stickyTarget;

@Nullable
@GuardedBy("this")
private volatile ListenableFuture<Response> callInFlight;

public ListenableFuture<Response> execute(Request request, EndpointChannel endpointChannel) {
if (stickyTarget != null) {
return executeWithStickyTarget(stickyTarget, request, endpointChannel);
}

synchronized (this) {
if (stickyTarget != null) {
return executeWithStickyTarget(stickyTarget, request, endpointChannel);
}

ListenableFuture<Response> callInFlightSnapshot = callInFlight;
if (callInFlightSnapshot == null) {
ListenableFuture<Response> result = DialogueFutures.addDirectCallback(
executeWithStickyToken(request, endpointChannel), callback);
callInFlight = Futures.nonCancellationPropagating(result);
return result;
} else {
ListenableFuture<Response> result = callInFlightSnapshot;
result = DialogueFutures.transformAsync(result, _input -> execute(request, endpointChannel));
result = DialogueFutures.catchingAllAsync(result, _throwable -> execute(request, endpointChannel));
return result;
}
}
}

private final class InFlightCallCallback implements FutureCallback<Response> {

@Override
public void onSuccess(Response result) {
successfulCall(result);
}

@Override
public void onFailure(Throwable _throwable) {
failed();
}
}

private synchronized void successfulCall(Response response) {
callInFlight = null;
if (stickyTarget == null) {
StickyTarget newStickyTarget =
response.attachments().getOrDefault(StickyAttachments.STICKY_TOKEN, null);
if (newStickyTarget != null) {
stickyTarget = newStickyTarget;
}
}
}

private synchronized void failed() {
callInFlight = null;
}

private static ListenableFuture<Response> executeWithStickyToken(
Request request, EndpointChannel endpointChannel) {
request.attachments().put(StickyAttachments.REQUEST_STICKY_TOKEN, Boolean.TRUE);
return endpointChannel.execute(request);
}

private static ListenableFuture<Response> executeWithStickyTarget(
StickyTarget stickyTarget, Request request, EndpointChannel endpointChannel) {
request.attachments().put(StickyAttachments.STICKY, stickyTarget);
return endpointChannel.execute(request);
}
}

private static final class StickyEndpointChannel implements EndpointChannel {
private final StickyRouter stickyRouter;
private final EndpointChannel delegate;

StickyEndpointChannel(StickyRouter stickyRouter, EndpointChannel delegate) {
this.stickyRouter = stickyRouter;
this.delegate = delegate;
}

@Override
public ListenableFuture<Response> execute(Request request) {
return stickyRouter.execute(request, delegate);
}

@Override
public String toString() {
return "StickyEndpointChannel{delegate=" + delegate + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,23 @@
import java.util.OptionalInt;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;

public final class DialogueChannel implements Channel, EndpointChannelFactory {
private final EndpointChannelFactory delegate;
private final Config cf;
private final StickySessionFactory stickySessionFactory;
private final ImmutableList<Channel> perHostChannels;

private DialogueChannel(Config cf, EndpointChannelFactory delegate) {
private DialogueChannel(
Config cf,
EndpointChannelFactory delegate,
StickySessionFactory stickySessionFactory,
ImmutableList<Channel> perHostChannels) {
this.cf = cf;
this.delegate = delegate;
this.stickySessionFactory = stickySessionFactory;
this.perHostChannels = perHostChannels;
}

@Override
Expand All @@ -53,6 +62,14 @@ public EndpointChannel endpoint(Endpoint endpoint) {
return delegate.endpoint(endpoint);
}

public StickySessionFactory stickySessionFactory() {
return stickySessionFactory;
}

public ImmutableList<Channel> perHost() {
return perHostChannels;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -162,8 +179,10 @@ public DialogueChannel build() {
}
ImmutableList<LimitedChannel> channels = perUriChannels.build();

LimitedChannel nodeSelectionChannel = NodeSelectionStrategyChannel.create(cf, channels);
Channel queuedChannel = QueuedChannel.create(cf, nodeSelectionChannel);
NodeSelectingChannel nodeSelectionChannel = NodeSelectionStrategyChannel.create(cf, channels);

Channel defaultQueuedChannel = QueuedChannel.create(cf, nodeSelectionChannel);
Channel queuedChannel = new QueueOverrideChannel(defaultQueuedChannel);

EndpointChannelFactory channelFactory = endpoint -> {
EndpointChannel channel = new EndpointChannelAdapter(endpoint, queuedChannel);
Expand All @@ -179,14 +198,55 @@ public DialogueChannel build() {
return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop
};

StickySessionFactory stickySessionFactory = DefaultStickySessionFactory.create(() -> {
LimitedChannel stickyLimitedChannel =
StickyConcurrencyLimitedChannel.createForQueueKey(nodeSelectionChannel, cf.channelName());
Channel queueOverride = QueuedChannel.createForSticky(cf, stickyLimitedChannel);
return endpoint -> {
EndpointChannel endpointChannel = channelFactory.endpoint(endpoint);
return (EndpointChannel) request -> {
request.attachments().put(QueueAttachments.QUEUE_OVERRIDE, queueOverride);
return endpointChannel.execute(request);
};
};
});

Meter createMeter = DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry())
.create()
.clientName(cf.channelName())
.clientType("dialogue-channel-non-reloading")
.build();
createMeter.mark();

return new DialogueChannel(cf, channelFactory);
ImmutableList<Channel> perHostChannels = IntStream.range(0, channels.size())
.mapToObj(index -> new ChannelAndFactory() {

private final LimitedChannel stickyLimitedChannel =
StickyConcurrencyLimitedChannel.createForQueueKey(
nodeSelectionChannel, cf.channelName());
private final Channel queueOverride =
QueuedChannel.createPerHost(cf, stickyLimitedChannel, index);

@Override
public EndpointChannel endpoint(Endpoint endpoint) {
EndpointChannel endpointChannel = channelFactory.endpoint(endpoint);
return request -> {
request.attachments().put(QueueAttachments.QUEUE_OVERRIDE, queueOverride);
StickyAttachments.routeToChannel(
request,
nodeSelectionChannel.nodeChannels().get(index));
return endpointChannel.execute(request);
};
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return endpoint(endpoint).execute(request);
}
})
.collect(ImmutableList.toImmutableList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this piece is more or less the alternative described here, except that we don't leverage it from the StickySessionFactory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah: I think it's valuable to provide wholesale fairness implementation. If we're not going to ship the whole thing (i.e. you can support internal usecase for per-transaction fairness, with ability to create a new sticky channel from an existing sticky channel with new queue), I'd be happy to switch to per-host queues even for sticky channels. The reason I added it in #1334 is because I wanted to ship the whole thing.


return new DialogueChannel(cf, channelFactory, stickySessionFactory, perHostChannels);
}

/** Does *not* do any clever live-reloading. */
Expand All @@ -195,4 +255,6 @@ public Channel buildNonLiveReloading() {
return build();
}
}

interface ChannelAndFactory extends Channel, EndpointChannelFactory {}
}
Loading