Skip to content

Commit

Permalink
DPL: fix lossy sending / forwarding policies
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Feb 13, 2024
1 parent 0a5ace5 commit aff59d1
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions Framework/Core/src/SendingPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
// non-blocking approach.
int64_t timeout = 10;
if (state.droppedMessages == 10 + 1) {
LOG(warning) << "Failed to send 10 messages with 10ms timeout in a row, switching to completely non-blocking mode";
LOG(warning) << "Failed to send 10 messages with 10ms timeout in a row, switching to completely non-blocking mode.";
}
if (state.droppedMessages == 0) {
timeout = 10;
}
if (state.droppedMessages > 10) {
timeout = 0;
}
size_t result = info.channel.Send(parts, timeout);
if (result > 0) {
int64_t result = info.channel.Send(parts, timeout);
if (result >= 0) {
state.droppedMessages = 0;
} else if (state.droppedMessages < std::numeric_limits<decltype(state.droppedMessages)>::max()) {
state.droppedMessages++;
Expand Down Expand Up @@ -97,10 +100,10 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
LOGP(info, "Sent {} parts for a total of {} bytes", parts.Size(), count);
auto res = channel->Send(parts, timeout);
if (res == (size_t)fair::mq::TransferCode::timeout) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName());
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout / 1000, channel->GetName());
channel->Send(parts);
LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
} else if (res == (size_t) fair::mq::TransferCode::error) {
} else if (res == (size_t)fair::mq::TransferCode::error) {
LOGP(fatal, "Error while sending on channel {}", channel->GetName());
} }},
SendingPolicy{
Expand All @@ -113,12 +116,22 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
.send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
auto &proxy = registry.get<FairMQDeviceProxy>();
auto *channel = proxy.getOutputChannel(channelIndex);
OutputChannelState& state = proxy.getOutputChannelState(channelIndex);
auto timeout = 1000;
auto res = channel->Send(parts, timeout);
if (res == (size_t)fair::mq::TransferCode::timeout) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}.", timeout/1000, channel->GetName());
} else if (res == (size_t) fair::mq::TransferCode::error) {
LOGP(info, "Error while sending on channel {}", channel->GetName());
if (state.droppedMessages > 0) {
timeout = 0;
}
if (state.droppedMessages == 1) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.", timeout / 1000, channel->GetName());
}
if (state.droppedMessages == 0) {
timeout = 1000;
}
int64_t res = channel->Send(parts, timeout);
if (res >= 0) {
state.droppedMessages = 0;
} else {
state.droppedMessages++;
} }},
SendingPolicy{
.name = "default",
Expand Down Expand Up @@ -204,12 +217,22 @@ std::vector<ForwardingPolicy> ForwardingPolicy::createDefaultPolicies()
.forward = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
auto &proxy = registry.get<FairMQDeviceProxy>();
auto *channel = proxy.getForwardChannel(channelIndex);
OutputChannelState& state = proxy.getOutputChannelState(channelIndex);
auto timeout = 1000;
auto res = channel->Send(parts, timeout);
if (res == (size_t)fair::mq::TransferCode::timeout) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}.", timeout/1000, channel->GetName());
} else if (res == (size_t) fair::mq::TransferCode::error) {
LOGP(info, "Error while sending on channel {}", channel->GetName());
if (state.droppedMessages > 0) {
timeout = 0;
}
if (state.droppedMessages == 1) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.", timeout / 1000, channel->GetName());
}
if (state.droppedMessages == 0) {
timeout = 1000;
}
int64_t res = channel->Send(parts, timeout);
if (res >= 0) {
state.droppedMessages = 0;
} else {
state.droppedMessages++;
} }},
createDefaultForwardingPolicy()};
}
Expand Down

0 comments on commit aff59d1

Please sign in to comment.