Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle bmp errors #1101

Merged
merged 23 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,9 @@ public static class TxrxProperties {
/** Whether to use a dummy transceiver. Useful for testing only. */
private boolean dummy;

/** The time a board has to be off before it can be powered on. */
private Duration offWaitTime;

/**
* @param period
* How long between when we send requests to the BMP control
Expand All @@ -1595,21 +1598,26 @@ public static class TxrxProperties {
* @param dummy
* Whether to use a dummy transceiver. Useful for testing
* only.
* @param offWaitTime
* How long to wait between powering off and powering on
* a board.
*/
public TxrxProperties(@DefaultValue("10s") Duration period,
@DefaultValue("15s") Duration probeInterval,
@DefaultValue("2") int powerAttempts,
@DefaultValue("3") int fpgaAttempts,
@DefaultValue("false") boolean fpgaReload,
@DefaultValue("5") int buildAttempts,
@DefaultValue("false") boolean dummy) {
@DefaultValue("false") boolean dummy,
@DefaultValue("30s") Duration offWaitTime) {
this.period = period;
this.probeInterval = probeInterval;
this.powerAttempts = powerAttempts;
this.fpgaAttempts = fpgaAttempts;
this.fpgaReload = fpgaReload;
this.buildAttempts = buildAttempts;
this.dummy = dummy;
this.offWaitTime = offWaitTime;
}

/**
Expand All @@ -1633,6 +1641,11 @@ public Duration getProbeInterval() {
return probeInterval;
}

@NotNull
public Duration getOffWaitTime() {
return offWaitTime;
}

void setProbeInterval(Duration probeInterval) {
this.probeInterval = probeInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public class AllocatorTask extends DatabaseAwareBean
*/
private static final Integer NUMBER_OF_JOBS_TO_QUOTA_CHECK = 100000;

private static final String DESTROY_ON_POWER_ERROR =
"Error changing power state! Please contact an administrator.";

@Autowired
private Epochs epochs;

Expand Down Expand Up @@ -182,32 +185,65 @@ private void updateJobNow(int jobId, JobState sourceState,

private boolean update(int jobId, JobState sourceState,
JobState targetState, Connection c) {
try (var getNTasks = c.query(COUNT_CHANGES_FOR_JOB);
try (var getChangeStatus = c.query(COUNT_CHANGES_FOR_JOB);
var setJobState = c.update(SET_STATE_PENDING);
var setJobDestroyed = c.update(SET_STATE_DESTROYED)) {
var setJobDestroyed = c.update(SET_STATE_DESTROYED);
var deleteChanges = c.update(DELETE_PENDING)) {
// Count pending changes for this state change
var n = getNTasks.call1(row -> row.getInteger("n_changes"),
var status = getChangeStatus.call1(ChangeStatus::new,
jobId, sourceState, targetState).orElseThrow(
() -> new RuntimeException(
"Error counting job tasks"));

log.debug("Job {} has {} changes remaining", jobId, n);

// If there are no more pending changes, set the job state to
// the target state
if (n == 0) {
log.debug("Job {} moving to state {}", jobId, targetState);
if (targetState == DESTROYED) {
int rows = setJobDestroyed.call(0, jobId);
if (rows != 1) {
log.warn("unexpected number of rows affected by "
+ "destroy in state update: {}", rows);
}
return rows > 0;
log.debug("Job {} has {} changes remaining", jobId,
status.nChanges);

// If the remaining things are errors, react (if there are errors,
// eventually non-errors will be deleted)
if (status.nErrors > 0 && (status.nErrors == status.nChanges)) {
log.info("Job {} changes resulted in errors.", jobId);

// We can delete the changes now as we know the issues
deleteChanges.call(jobId, sourceState, targetState);

// If we are going to destroyed, we can mostly ignore errors,
// and similarly if we are going to queue it again anyway
if (targetState == DESTROYED || targetState == QUEUED) {
return true;
}

// If the job was ready before we tried to do this, we have to
// destroy the job with an error!
if (sourceState == READY) {
destroyJob(c, jobId, DESTROY_ON_POWER_ERROR);
return true;
}
return setJobState.call(targetState, 0, jobId) > 0;

// If the job was not ready, we need to re-queue and reallocate
// boards
scheduler.schedule(() -> setPower(jobId, OFF, QUEUED),
Instant.now());
return false;
} else if (status.nChanges > 0) {
// There are still changes happening - let them finish first
// even if there are errors as safer to do once everything
// is done.
return false;
}

// If there are no more pending changes and no errors,
// set the job state to the target state
log.debug("Job {} moving to state {}", jobId, targetState);
if (targetState == DESTROYED) {
int rows = setJobDestroyed.call(jobId);
if (rows != 1) {
log.warn("unexpected number of rows affected by "
+ "destroy in state update: {}", rows);
}
return rows > 0;
}
return false;

return setJobState.call(targetState, jobId) > 0;
}
}

Expand Down Expand Up @@ -285,6 +321,17 @@ private class Perimeter {
}
}

private class ChangeStatus {
private final int nChanges;

private final int nErrors;

ChangeStatus(Row row) {
nChanges = row.getInt("n_changes");
nErrors = row.getInt("n_errors");
}
}

/** Encapsulates the queries and updates used in power control. */
private class PowerSQL extends AbstractSQL {
/** Get basic information about a specific job. */
Expand Down Expand Up @@ -571,7 +618,7 @@ void updateEpochs() {
}

void updateBMPs() {
if (!bmps.isEmpty()) {
if (!bmps.isEmpty() && bmpController != null) {
// Poke the BMP controller to start looking!
log.debug("Triggering BMPs {}", bmps);
bmpController.triggerSearch(bmps);
Expand Down Expand Up @@ -1174,17 +1221,6 @@ private Collection<BMPAndMachine> setAllocation(AllocSQL sql, int jobId,
return setPower(sql, jobId, ON, READY);
}

/**
* Reset a job after a failure on a BMP.
*
* @param jobId
* The identifier of the job to reset.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void resetPowerOnFailure(int jobId) {
scheduler.schedule(() -> setPower(jobId, OFF, QUEUED), Instant.now());
}

@Override
public boolean setPower(int jobId, PowerState power, JobState targetState) {
if (targetState == DESTROYED) {
Expand Down Expand Up @@ -1273,16 +1309,15 @@ private Collection<BMPAndMachine> setPower(PowerSQL sql, int jobId,
if (targetState == DESTROYED) {
log.debug("num changes for {} in destroy: {}", jobId, numPending);
log.info("destroying job {} after power change", jobId);
int rows = sql.setStateDestroyed.call(numPending, jobId);
int rows = sql.setStateDestroyed.call(jobId);
if (rows != 1) {
log.warn("unexpected number of jobs marked destroyed: {}",
rows);
}
} else {
log.debug("Num changes for target {}: {}", targetState, numPending);
sql.setStatePending.call(
numPending > 0 ? POWER : targetState,
numPending, jobId);
numPending > 0 ? POWER : targetState, jobId);
}

return bmps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,11 @@ public void destroy(String reason) {
rememberer.killProxies(id);
}

@Override
public void setPower(boolean power) {
powerController.setPower(id, power ? ON : OFF, READY);
}

@Override
public boolean waitForChange(Duration timeout) {
if (isNull(epoch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,13 @@ interface Job extends Waitable {
*/
void destroy(String reason);

/**
* Power a job on or off.
*
* @param power True for on, False for off
*/
void setPower(boolean power);

/**
* @return The state of the job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static uk.ac.manchester.spinnaker.alloc.bmp.NonBootOperation.WRITE_BL;
import static uk.ac.manchester.spinnaker.alloc.model.JobState.DESTROYED;
import static uk.ac.manchester.spinnaker.alloc.model.JobState.QUEUED;
import static uk.ac.manchester.spinnaker.alloc.model.JobState.READY;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
Expand Down Expand Up @@ -560,12 +559,12 @@ private void done() {
* @return Whether the state of boards or jobs has changed.
*/
private void failed() {
var resetJobAlloc = false;
try (var c = getConnection();
var deallocateBoards = c.update(DEALLOCATE_BMP_BOARDS_JOB);
var deleteChange = c.update(FINISHED_PENDING);
var errorChange = c.update(ERROR_PENDING);
var setBoardPowerOff = c.update(SET_BOARD_POWER_OFF)) {
resetJobAlloc = c.transaction(() -> {
c.transaction(() -> {
// We should mark the boards as off
int turnedOff =
powerOffBoards.stream().map(this::getBoardId)
Expand All @@ -576,27 +575,32 @@ private void failed() {
powerOnBoards.stream().map(this::getBoardId)
.mapToInt(setBoardPowerOff::call).sum();

// Deallocate the boards on this bmp from the job;
// other boards can be deallocated elsewhere.
deallocateBoards.call(jobId, bmpId);

// Delete change ids as they are done even if failed.
var completed = changeIds.stream().mapToInt(
deleteChange::call).sum();
// If we are going to queued or destroyed, we can just
// ignore the error as we will reallocate anyway
int completed = 0;
if (to == DESTROYED || to == QUEUED) {
// Need to mark the boards as not allocated; slightly
// dodgy since they might still be on, but not a lot
// we can do about it!
deallocateBoards.call(jobId, bmpId);
completed = changeIds.stream().mapToInt(
deleteChange::call).sum();
} else {

// If we are going to READY, we must mark changes as
// failed to make sure we don't think we are done!
completed = changeIds.stream().mapToInt(
errorChange::call).sum();
}

log.debug(
"BMP ACTION FAILED on {} ({}:{}->{}) off:{} "
+ "completed:{}",
+ " completed {}",
bmpId, jobId, from, to, turnedOff, completed);

// If we were meant to be powering up, reset the allocation
// once done here.
return (to == READY && powerOffBoards.isEmpty());
});
}
if (resetJobAlloc) {
allocator.resetPowerOnFailure(jobId);
}
// Tell the allocator something has happened
allocator.updateJob(jobId, from, to);
}

/**
Expand Down Expand Up @@ -1011,6 +1015,8 @@ private class PowerChange {

final Integer boardNum;

final Instant powerOffTime;

final boolean power;

final JobState from;
Expand All @@ -1030,6 +1036,11 @@ private class PowerChange {
offLinks = List.of(Direction.values()).stream().filter(
link -> !row.getBoolean(link.columnName)).collect(
Collectors.toList());
Instant powerOff = row.getInstant("power_off_timestamp");
if (powerOff == null) {
powerOff = Instant.EPOCH;
}
powerOffTime = powerOff;
}

boolean isSameJob(PowerChange p) {
Expand Down Expand Up @@ -1072,6 +1083,18 @@ public synchronized void run() {
}
}

private boolean waitedLongEnough(PowerChange change) {
// Power off can be done any time
if (!change.power) {
return true;
}

// Power on should wait until a time after last off
Instant powerOnTime = change.powerOffTime.plus(
props.getOffWaitTime());
return powerOnTime.isBefore(Instant.now());
}

/**
* Get the things that we want the worker to do. <em>Be very
* careful!</em> Because this necessarily involves the database, this
Expand All @@ -1095,11 +1118,13 @@ private List<Request> getRequestedOperations() {
while (!powerChanges.isEmpty()) {
var change = powerChanges.poll();
var jobChanges = new ArrayList<>(List.of(change));
var canDoNow = waitedLongEnough(change);
while (!powerChanges.isEmpty()
&& change.isSameJob(powerChanges.peek())) {
canDoNow &= waitedLongEnough(powerChanges.peek());
jobChanges.add(powerChanges.poll());
}
if (!jobChanges.isEmpty()) {
if (!jobChanges.isEmpty() && canDoNow) {
log.debug("Running job changes {}", jobChanges);
requests.add(new PowerRequest(bmpId, change.jobId,
change.from, change.to, jobChanges));
Expand Down
Loading
Loading