From 25930da6e81e8b67074ebefc8eadc32d9ff3b8af Mon Sep 17 00:00:00 2001 From: Adam Gabrys Date: Wed, 20 Apr 2022 13:42:42 +0200 Subject: [PATCH] Prevent sending interrupt signals by lost killer tasks by activity When many activity timeouts are run at the same time, sometimes the "Sending interrupt signal to process" message appears and the build is aborted (JENKINS-58752). The "Cancelling nested steps due to timeout" message is never printed. The code has been refactored to prevent such issues: - the implementation of the activity and absolute timeouts have been separated to improve the code readability - the tasks executed after a delay are always created in synchronized sections, to prevent losing tasks which should be canceled - the timer logic of the activity timeout is changed from always stopping to verifying if the logic should be stopped or continued, so the number of timers is always under control (less instances) - the `Tick` class is replaced by a listener which notifies the step less frequently about the changes. The behavior could be controlled by setting the `org.jenkinsci.plugins.workflow.steps.TimeoutStepExecution.activityNotifyWaitRatio` property. It informs when the earliest the information about new activities should be sent to the timeout (`time * ratio`). When there were no activities in that time, then the next activity will be announced right after it has been reported There are additional changes introduced in this commit: - the timeout id is always printed in the log messages which improves debugging (e.g. when timeouts are nested) - the logic is more precise. The previous implementation allowed exceeding the timeout by 1/10 of the time (JENKINS-63696). The new stops the process right after the timeout is reached. There is a property to allow exceeding it a little bit - `org.jenkinsci.plugins.workflow.steps.TimeoutStepExecution.activityPrecision`. It is necessary to not abort the logic due to delay in the notification process. --- .../plugins/workflow/steps/TimeoutStep.java | 4 + .../steps/TimeoutStepExecutionThreadSafe.java | 459 ++++++++++++++++++ 2 files changed, 463 insertions(+) create mode 100644 src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStepExecutionThreadSafe.java diff --git a/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStep.java b/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStep.java index 3f2bea5e..487b9536 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStep.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStep.java @@ -4,6 +4,7 @@ import hudson.Extension; import hudson.model.TaskListener; import hudson.util.ListBoxModel; +import jenkins.util.SystemProperties; import org.kohsuke.stapler.DataBoundConstructor; import org.kohsuke.stapler.DataBoundSetter; @@ -59,6 +60,9 @@ public DescriptorImpl getDescriptor() { @Override public StepExecution start(StepContext context) throws Exception { + if (SystemProperties.getBoolean(TimeoutStep.class.getName() + ".threadsafe")) { + return new TimeoutStepExecutionThreadSafe(this, context); + } return new TimeoutStepExecution(this, context); } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStepExecutionThreadSafe.java b/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStepExecutionThreadSafe.java new file mode 100644 index 00000000..2ad370e9 --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/steps/TimeoutStepExecutionThreadSafe.java @@ -0,0 +1,459 @@ +package org.jenkinsci.plugins.workflow.steps; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import hudson.Main; +import hudson.Util; +import hudson.console.ConsoleLogFilter; +import hudson.console.LineTransformationOutputStream; +import hudson.model.Result; +import hudson.model.Run; +import hudson.model.TaskListener; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.function.LongBinaryOperator; +import java.util.logging.Level; +import java.util.logging.Logger; +import jenkins.model.CauseOfInterruption; +import jenkins.util.SystemProperties; +import jenkins.util.Timer; +import org.jenkinsci.plugins.workflow.flow.FlowExecution; +import org.jenkinsci.plugins.workflow.graph.FlowNode; +import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner; +import org.jenkinsci.plugins.workflow.steps.TimeoutStepExecution.ExceededTimeout; + +public class TimeoutStepExecutionThreadSafe extends AbstractStepExecutionImpl { + + @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL") + public static /* not final */ boolean forceInterruption = SystemProperties.getBoolean(TimeoutStepExecution.class.getName() + ".forceInterruption"); + + private static final long serialVersionUID = 2L; + private static final Logger LOGGER = Logger.getLogger(TimeoutStepExecutionThreadSafe.class.getName()); + private static final String ACTIVITY_PRECISION_PROPERTY_NAME = TimeoutStepExecution.class.getName() + ".activityPrecision"; + private static final long ACTIVITY_PRECISION_IN_MILLISECONDS = 1000L; + private static final String ACTIVITY_NOTIFY_WAIT_RATIO_PROPERTY_NAME = TimeoutStepExecution.class.getName() + ".activityNotifyWaitRatio"; + private static final String ACTIVITY_NOTIFY_WAIT_RATIO = String.valueOf(0.8); + + private final String id; + private final Timeout timeout; + + TimeoutStepExecutionThreadSafe(TimeoutStep step, StepContext context) { + super(context); + id = UUID.randomUUID().toString().replace("-", ""); + timeout = createTimeout(id, step, context); + } + + private static Timeout createTimeout(String id, TimeoutStep step, StepContext context) { + long time = step.getUnit().toMillis(step.getTime()); + if (step.isActivity()) { + long precision = SystemProperties.getLong(ACTIVITY_PRECISION_PROPERTY_NAME, ACTIVITY_PRECISION_IN_MILLISECONDS); + double waitRatio = Double.parseDouble(SystemProperties.getString(ACTIVITY_NOTIFY_WAIT_RATIO_PROPERTY_NAME, ACTIVITY_NOTIFY_WAIT_RATIO)); + return new ActivityTimeout(id, context, time, precision, waitRatio); + } + return new AbsoluteTimeout(id, context, time); + } + + @Override + public boolean start() throws Exception { + timeout.startStepExecution(); + return false; + } + + @Override + public void onResume() { + timeout.resume(); + } + + @Override + public String getStatus() { + return timeout.getStatus(); + } + + private abstract static class Timeout implements Serializable { + + private static final long serialVersionUID = 1L; + private static final long GRACE_PERIOD_MILLISECONDS = Main.isUnitTest ? 5_000 : 60_000; + + protected final String id; + protected final StepContext context; + private BodyExecution body; + private long forceKillTimestamp; + + public Timeout(String id, StepContext context) { + this.id = id; + this.context = context; + } + + public void startStepExecution() throws IOException, InterruptedException { + BodyInvoker bodyInvoker = setup(context.newBodyInvoker()); + bodyInvoker = bodyInvoker.withCallback(new Callback(this)); + body = bodyInvoker.start(); + start(); + } + + protected BodyInvoker setup(BodyInvoker bodyInvoker) throws IOException, InterruptedException { + return bodyInvoker; + } + + protected abstract void start(); + + public abstract void resume(); + + public abstract void stop(); + + public String getStatus() { + long now = System.currentTimeMillis(); + long delay = getEndTimestamp() - now; + if (delay > 0) { + return "body has another " + Util.getTimeSpanString(delay) + " to run"; + } + String message = "body did not yet respond to the signal, overshot by " + Util.getTimeSpanString(-delay); + if (forceKillTimestamp == 0) { + return message; + } + message += " (forcibly killing "; + long forceDelay = forceKillTimestamp - now; + if (forceDelay > 0) { + message += "in " + Util.getTimeSpanString(forceDelay); + } else { + message += "is in progress for " + Util.getTimeSpanString(-forceDelay); + } + message += ')'; + return message; + } + + protected abstract long getEndTimestamp(); + + protected void reachTimeout() { + log("Cancelling nested steps due to timeout"); + body.cancel(new ExceededTimeout(getFromContext(FlowNode.class).map(FlowNode::getId).orElse(null))); + forceKillTimestamp = System.currentTimeMillis() + GRACE_PERIOD_MILLISECONDS; + Timer.get().schedule(this::killForcefully, GRACE_PERIOD_MILLISECONDS, TimeUnit.MILLISECONDS); + } + + private void killForcefully() { + if (body.isDone()) { + return; + } + log("Body did not finish within grace period; terminating with extreme prejudice"); + getFromContext(FlowExecution.class).ifPresent(exec -> { + Throwable death = new FlowInterruptedException(Result.ABORTED, new ExceededTimeout(getFromContext(FlowNode.class).map(FlowNode::getId).orElse(null))); + // Due to JENKINS-25504, this does not accomplish anything beyond what the original body.cancel would have: getContext().onFailure(death); + ListenableFuture> currentExecutions = exec.getCurrentExecutions(true); + currentExecutions.addListener(() -> { + try { + FlowNode timeoutNode = context.get(FlowNode.class); + for (StepExecution stepExecution : currentExecutions.get()) { + FlowNode innerStepNode = stepExecution.getContext().get(FlowNode.class); + LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner(); + scanner.setup(innerStepNode); + for (FlowNode enclosing : scanner) { + if (enclosing.equals(timeoutNode)) { + stepExecution.getContext().onFailure(death); + break; + } + } + } + } catch (IOException | InterruptedException | ExecutionException e) { + LOGGER.log(Level.WARNING, null, e); + } + }, MoreExecutors.newDirectExecutorService()); + }); + } + + protected void log(String message) { + getFromContext(TaskListener.class).orElse(TaskListener.NULL).getLogger().println(message + " (id: " + id + ')'); + LOGGER.log(Level.FINE, "[{0}] {1}", new Object[]{id, message}); + } + + protected Optional getFromContext(Class clazz) { + try { + return Optional.ofNullable(context.get(clazz)); + } catch (IOException | InterruptedException e) { + LOGGER.log(Level.WARNING, null, e); + return Optional.empty(); + } + } + + private static class Callback extends BodyExecutionCallback.TailCall { + + private static final long serialVersionUID = 1L; + + private final Timeout timeout; + + public Callback(Timeout timeout) { + this.timeout = timeout; + } + + @Override + protected void finished(StepContext context) { + timeout.stop(); + } + + @Override + public void onFailure(StepContext context, Throwable t) { + if (t instanceof FlowInterruptedException && !forceInterruption) { + // check whether the FlowInterruptedException has propagated past the corresponding timeout step + timeout.getFromContext(FlowNode.class).ifPresent(flowNode -> { + FlowInterruptedException fie = (FlowInterruptedException) t; + for (CauseOfInterruption cause : fie.getCauses()) { + if (cause instanceof ExceededTimeout) { + ExceededTimeout exceededTimeout = (ExceededTimeout) cause; + if (flowNode.getId().equals(exceededTimeout.getNodeId())) { + fie.setActualInterruption(false); + } + } + } + }); + } + super.onFailure(context, t); + } + } + } + + private static class AbsoluteTimeout extends Timeout { + + private static final long serialVersionUID = 1L; + + private long time; + private long end; + private transient ScheduledFuture killer; + + public AbsoluteTimeout(String id, StepContext context, long time) { + super(id, context); + this.time = time; + } + + @Override + protected void start() { + log("Timeout set to expire in " + Util.getTimeSpanString(time)); + end = System.currentTimeMillis() + time; + scheduleKiller(time); + } + + private synchronized void scheduleKiller(long delay) { + killer = Timer.get().schedule(() -> reachTimeout(), delay, TimeUnit.MILLISECONDS); + } + + @Override + public void resume() { + time = end - System.currentTimeMillis(); + if (time <= 0) { + reachTimeout(); + } else { + scheduleKiller(time); + } + } + + @Override + public synchronized void stop() { + killer.cancel(false); + } + + @Override + protected long getEndTimestamp() { + return end; + } + } + + private static class ActivityTimeout extends Timeout { + + private static final long serialVersionUID = 1L; + + private final long time; + private final long precision; + private final double waitRatio; + private final LongAccumulator lastActivity = new LongAccumulator(new MaxFunction(), 0); + private transient ScheduledFuture timer; + private boolean stopped; + + public ActivityTimeout(String id, StepContext context, long time, long precision, double waitRatio) { + super(id, context); + this.time = time; + this.precision = precision; + this.waitRatio = waitRatio; + } + + @Override + protected BodyInvoker setup(BodyInvoker bodyInvoker) throws IOException, InterruptedException { + return bodyInvoker.withContext( + BodyInvoker.mergeConsoleLogFilters( + context.get(ConsoleLogFilter.class), + new NotifierConsoleLogFilter(this) + ) + ); + } + + @Override + public void start() { + log("Timeout set to expire after " + Util.getTimeSpanString(time) + " without activity"); + lastActivity.accumulate(System.currentTimeMillis()); + scheduleTimer(time); + } + + private synchronized void scheduleTimer(long delay) { + if (stopped) { + return; + } + timer = Timer.get().schedule(() -> checkTimer(System.currentTimeMillis()), delay, TimeUnit.MILLISECONDS); + } + + private void checkTimer(long now) { + long timestamp = lastActivity.get(); + long delay = time - (now - timestamp) + precision; + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.log(Level.FINE, "[" + id + "] checkTimer: now = " + now + ", timestamp = " + timestamp + ", delay = " + delay); + } + if (delay <= 0) { + reachTimeout(); + } else { + scheduleTimer(delay); + } + } + + @Override + public void resume() { + checkTimer(System.currentTimeMillis()); + } + + @Override + public synchronized void stop() { + stopped = true; + timer.cancel(false); + } + + @Override + protected long getEndTimestamp() { + return lastActivity.get() + time; + } + + public void onActivity(long timestamp) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.log(Level.FINE, "[" + id + "] onActivity: now = " + System.currentTimeMillis() + ", timestamp = " + timestamp); + } + lastActivity.accumulate(timestamp); + } + + private static class MaxFunction implements LongBinaryOperator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public long applyAsLong(long left, long right) { + return Math.max(left, right); + } + } + + private static class NotifierConsoleLogFilter extends ConsoleLogFilter implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + private final long time; + + public NotifierConsoleLogFilter(ActivityTimeout timeout) { + this.id = timeout.id; + this.time = (long) (timeout.time * timeout.waitRatio); + } + + @Override + public OutputStream decorateLogger(Run build, OutputStream logger) { + ActivityListener listener = new ActivityListener(id, time); + return new LineTransformationOutputStream.Delegating(logger) { + @Override + protected void eol(byte[] b, int len) throws IOException { + out.write(b, 0, len); + listener.onNewLine(System.currentTimeMillis()); + } + }; + } + + private static class ActivityListener implements Runnable { + + private final String id; + private final long time; + private final AtomicLong lastReceivedTimestamp = new AtomicLong(); + private long lastSentTimestamp; + + public ActivityListener(String id, long time) { + this.id = id; + this.time = time; + } + + private void onNewLine(long when) { + if (lastReceivedTimestamp.getAndSet(when) == 0) { + tryNotify(); + } + } + + @Override + public synchronized void run() { + if (!lastReceivedTimestamp.compareAndSet(lastSentTimestamp, 0)) { + tryNotify(); + } + } + + private synchronized void tryNotify() { + long timestamp = lastReceivedTimestamp.get(); + if (timestamp == 0) { + log("timestamp is 0"); + return; + } + + if (timestamp == lastSentTimestamp) { + log("timestamp has been sent earlier", timestamp); + return; + } + + notify(timestamp); + lastSentTimestamp = timestamp; + long delay = time - (System.currentTimeMillis() - lastSentTimestamp); + if (delay > 0) { + schedule(delay); + } else { + run(); + } + } + + private void notify(long timestamp) { + log("notify step execution", timestamp); + StepExecution.applyAll(TimeoutStepExecutionThreadSafe.class, stepExecution -> { + if (id.equals(stepExecution.id)) { + ((ActivityTimeout) stepExecution.timeout).onActivity(timestamp); + } + return null; + }); + } + + private void schedule(long delay) { + log("schedule timer"); + Timer.get().schedule(this, delay, TimeUnit.MILLISECONDS); + } + + private void log(String message) { + log(message, -1); + } + + private void log(String message, long timestamp) { + if (LOGGER.isLoggable(Level.FINE)) { + String fullMessage = "[" + id + "][" + this.hashCode() + "] " + message + ", now = " + System.currentTimeMillis(); + if (timestamp > -1) { + fullMessage += ", timestamp = " + timestamp; + } + LOGGER.log(Level.FINE, fullMessage); + } + } + } + } + } +}