Skip to content

Commit

Permalink
fixed bug with parallel tasks changing signal light
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Rampp authored and Florian Rampp committed Aug 20, 2012
1 parent 26f5341 commit 6040b9a
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public int getNumberForRed() {
return green == SlotPosition.top ? 1 : 3;
}

public SlotPosition getGreen() {
return green;
}

public HostAndPort getHostAndPort() {
return HostAndPort.fromParts(host, port);
}
Expand Down
7 changes: 6 additions & 1 deletion hub/src/main/java/com/jambit/jambel/hub/HubModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import com.jambit.jambel.hub.poller.JobStatePoller;
import com.jambit.jambel.hub.retrieval.JobRetriever;
import com.jambit.jambel.hub.retrieval.JobStateRetriever;
Expand All @@ -24,7 +27,9 @@ protected void configure() {

// polling
bind(JobStatePoller.class);
bind(ScheduledExecutorService.class).toInstance(Executors.newScheduledThreadPool(POLLING_THREADS));
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("poller-%d").build();
bind(ScheduledExecutorService.class).annotatedWith(Names.named("poller")).toInstance(
Executors.newScheduledThreadPool(POLLING_THREADS, namedThreadFactory));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

import com.google.common.base.Optional;
Expand All @@ -22,7 +23,8 @@ public class JobStatePoller {
private final JobStatusReceiver receiver;

@Inject
public JobStatePoller(ScheduledExecutorService executor, JobStateRetriever retriever, JobStatusReceiver receiver) {
public JobStatePoller(@Named("poller") ScheduledExecutorService executor, JobStateRetriever retriever,
JobStatusReceiver receiver) {
this.executor = executor;
this.retriever = retriever;
this.receiver = receiver;
Expand Down
24 changes: 17 additions & 7 deletions server/src/main/java/com/jambit/jambel/JambelDestroyer.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.jambit.jambel;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.net.HostAndPort;
import com.jambit.jambel.config.SignalLightConfiguration;
import com.jambit.jambel.config.SignalLightConfiguration.SlotPosition;
import com.jambit.jambel.light.LightMode;
import com.jambit.jambel.light.SignalLight;
import com.jambit.jambel.light.SignalLightStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;

public class JambelDestroyer {

Expand All @@ -34,11 +36,19 @@ private void resetSignalLight() {

SignalLightStatus allOn = SignalLightStatus.all(LightMode.ON);
sendAndWait(allOn, 2000);
sendAndWait(allOn.butGreen(LightMode.OFF), 500);
sendAndWait(allOn.butGreen(LightMode.OFF).butYellow(LightMode.OFF), 500);
// top to bottom sequence
if (signalLight.getConfiguration().getGreen() == SlotPosition.top) {
sendAndWait(allOn.butGreen(LightMode.OFF), 500);
sendAndWait(allOn.butGreen(LightMode.OFF).butYellow(LightMode.OFF), 500);
}
else {
sendAndWait(allOn.butRed(LightMode.OFF), 500);
sendAndWait(allOn.butRed(LightMode.OFF).butYellow(LightMode.OFF), 500);
}

signalLight.reset();
} else {
}
else {
logger.warn("cannot reset signal light at {}, it is not available", hostAndPort);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
Expand All @@ -20,8 +22,9 @@ public class SignalLightModule extends AbstractModule {
protected void configure() {
bind(SignalLight.class).to(CommandControlledSignalLight.class);
bind(SignalLightCommandSender.class).to(LanCommandSender.class).in(Singleton.class);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("status-updater-%d").build();
bind(ScheduledExecutorService.class).annotatedWith(Names.named("signalLight")).toInstance(
Executors.newSingleThreadScheduledExecutor());
Executors.newSingleThreadScheduledExecutor(namedThreadFactory));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.jambit.jambel.light.cmdctrl;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.inject.Named;
import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +26,7 @@
* @author "Florian Rampp ([email protected])"
*
*/
@Singleton
public final class CommandControlledSignalLight implements SignalLight {

private static final Logger logger = LoggerFactory.getLogger(CommandControlledSignalLight.class.getName());
Expand Down Expand Up @@ -83,36 +84,51 @@ public SignalLightStatus getCurrentStatus() {
}
}

private Runnable sendNewStatus(SignalLightStatus newStatus) {
final Integer[] lightValues = toIntValues(newStatus);
return new Runnable() {
@Override
public void run() {
try {
sendCommand("set_all=" + Joiner.on(',').join(lightValues));
}
catch (SignalLightNotAvailableException e) {
logger.warn("could not update signal light", e);
}
private final class UpdateLightStatusTask implements Runnable {
private boolean failOnNextExecution = false;

private final SignalLightStatus newStatus;

public UpdateLightStatusTask(SignalLightStatus newStatus) {
this.newStatus = newStatus;
}

@Override
public void run() {
if (failOnNextExecution) {
throw new RuntimeException();
}
};
}

final Integer[] lightValues = toIntValues(newStatus);

try {
sendCommand("set_all=" + Joiner.on(',').join(lightValues));
}
catch (SignalLightNotAvailableException e) {
logger.warn("could not update signal light", e);
}
}

public void failOnNextExecution() {
failOnNextExecution = true;
}
}

private ScheduledFuture<?> future;
private UpdateLightStatusTask scheduledTask;

@Override
public void setNewStatus(SignalLightStatus newStatus) {
Optional<Integer> keepAliveInterval = configuration.getKeepAliveInterval();
if (keepAliveInterval.isPresent()) {
if (future != null) {
future.cancel(false);
if (scheduledTask != null) {
// get rid of the old task (this is the only way to do it, that sucks!)
scheduledTask.failOnNextExecution();
}
future = executor.scheduleWithFixedDelay(sendNewStatus(newStatus), 0, keepAliveInterval.get(),
TimeUnit.MILLISECONDS);
scheduledTask = new UpdateLightStatusTask(newStatus);
executor.scheduleWithFixedDelay(scheduledTask, 0, keepAliveInterval.get(), TimeUnit.MILLISECONDS);
}
else {
executor.execute(sendNewStatus(newStatus));
executor.execute(new UpdateLightStatusTask(newStatus));
}
}

Expand Down

0 comments on commit 6040b9a

Please sign in to comment.