Skip to content

Commit

Permalink
1600gz. Run cores atomically to commit a transaction. Closes #7
Browse files Browse the repository at this point in the history
  • Loading branch information
nettyso committed May 9, 2021
1 parent 70e192b commit 6556dd1
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions src/wire/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ let signalId = 0;
let activeCore: WireCore<X> | undefined;
// WireSignals written to during a transaction
let transactionSignals: Set<WireSignal<X>> | undefined;
let transactionCommit = false;

// Symbol() doesn't gzip well. `[] as const` gzips best but isn't debuggable
// without a lookup Map<> and other hacks.
Expand Down Expand Up @@ -140,6 +141,22 @@ const _initCore = (wC: WireCore<X>): void => {
wC.signalsIC = new Set();
};

const _runCores = (cores: Set<WireCore<X>>): void => {
// Copy the cores since the Set() can be added to while running which loops
// infinitely. Depth ordering needs an array but in Sinuous they use a Set()
const toRun = [...cores].sort((a, b) => a.sort - b.sort);
// Mark upstream computeds as stale. Must be in an isolated for-loop
toRun.forEach((wC) => {
if (wC.state === STATE_WIRED_PAUSED || wC.cS) wC.state = STATE_WIRED_STALE;
});
// Calls are ordered parent->child
toRun.forEach((wC) => {
// RESET|RUNNING|WAITING < PAUSED|STALE. Skips paused cores and lazy
// computed-signals. RESET cores shouldn't exist...
if (wC.state < STATE_WIRED_PAUSED) wC();
});
};

/**
* Removes two-way subscriptions between its signals and itself. This also turns
* off the core until it is manually re-run. */
Expand Down Expand Up @@ -213,19 +230,8 @@ const signal = <T>(value: T, id?: string): WireSignal<T> => {
(saved as R).state = STATE_WIRED_STALE;
wS.cC = saved as R;
}
// Notify. Copy wS.cores since the Set() can grow while running and loop
// infinitely. Depth ordering needs an array while Sinuous uses a Set()
const toRun = [...wS.cores].sort((a, b) => a.sort - b.sort);
// Mark upstream computeds as stale. Must be in an isolated for-loop
toRun.forEach((wC) => {
if (wC.state === STATE_WIRED_PAUSED || wC.cS) wC.state = STATE_WIRED_STALE;
});
// Calls are ordered parent->child
toRun.forEach((wC) => {
// RESET|RUNNING|WAITING < PAUSED|STALE. Skips paused cores and lazy
// computed-signals. RESET cores shouldn't exist...
if (wC.state < STATE_WIRED_PAUSED) wC();
});
// Notify every write _unless_ this is a post-transaction commit
if (!transactionCommit) _runCores(wS.cores);
}
if (read) {
// Re-run the core to get a new value if needed
Expand Down Expand Up @@ -268,16 +274,24 @@ const transaction = <T>(fn: () => T): T => {
let ret: unknown;
try {
ret = fn();
const signals = transactionSignals;
transactionSignals = prev;
const transactionCores = new Set<WireCore<X>>();
transactionCommit = true;
signals.forEach((wS) => {
// Doesn't run any subscribed cores since `transactionCommit` is set
wS(wS.next);
delete wS.next;
wS.cores.forEach((wC) => transactionCores.add(wC));
});
transactionCommit = false;

This comment has been minimized.

Copy link
@nettyso

nettyso May 9, 2021

Author Owner

I don't do transactionCommit = false again outside of the try/catch because I can hope/assume that there will never be an error during the signal write - there's no opportunity for it since no userland code runs. Meanwhile I have to double check transactionSignals since that could easily error.

_runCores(transactionCores);
} catch (err) {
error = err;
}
const signals = transactionSignals;
// Yes this happens a few lines up; do it again in case the `try` throws
transactionSignals = prev;
if (error) throw error;
signals.forEach((wS) => {
wS(wS.next);
delete wS.next;
});
return ret as T;
};

Expand Down

0 comments on commit 6556dd1

Please sign in to comment.