Skip to content

Commit

Permalink
fix(state-manager): throw if schemaName does not exits in observe and…
Browse files Browse the repository at this point in the history
… getCollection - fix #65
  • Loading branch information
b-ma committed Dec 12, 2023
1 parent 7f0e7c2 commit 251ea94
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 35 deletions.
39 changes: 25 additions & 14 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ATTACH_ERROR,
OBSERVE_REQUEST,
OBSERVE_RESPONSE,
OBSERVE_ERROR,
OBSERVE_NOTIFICATION,
UNOBSERVE_NOTIFICATION,
DELETE_SCHEMA,
Expand All @@ -29,7 +30,7 @@ class BaseStateManager {
this.client = { id, transport };

this._statesById = new Map();
this._observeListeners = new Map(); // Map <callback, filterSchemaName>
this._observeListeners = new Map(); // Map <callback, observedSchemaName>
this._cachedSchemas = new Map();
this._observeRequestCallbacks = new Map();

Expand Down Expand Up @@ -88,15 +89,15 @@ class BaseStateManager {
this.client.transport.addListener(OBSERVE_RESPONSE, async (reqId, ...list) => {
// retrieve the callback that have been stored in observe to make sure
// we don't call another callback that may have been registered earlier.
const [callback, filterSchemaName] = this._observeRequestCallbacks.get(reqId);
const [callback, observedSchemaName] = this._observeRequestCallbacks.get(reqId);
this._observeRequestCallbacks.delete(reqId);

// now that the OBSERVE_REPOSNSE callback is executed, store it in
// OBSERVE_NOTIFICATION listeners
this._observeListeners.set(callback, filterSchemaName);
this._observeListeners.set(callback, observedSchemaName);

const promises = list.map(([schemaName, stateId, nodeId]) => {
if (filterSchemaName === '*' || filterSchemaName === schemaName) {
if (observedSchemaName === null || observedSchemaName === schemaName) {
return callback(schemaName, stateId, nodeId);
} else {
return Promise.resolve();
Expand All @@ -116,9 +117,15 @@ class BaseStateManager {
this._promiseStore.resolve(reqId, unsubscribe);
});

// Observe error can occur if observed schema name does not exists
this.client.transport.addListener(OBSERVE_ERROR, (reqId, msg) => {
this._observeRequestCallbacks.delete(reqId);
this._promiseStore.reject(reqId, msg);
});

this.client.transport.addListener(OBSERVE_NOTIFICATION, (schemaName, stateId, nodeId) => {
this._observeListeners.forEach((filterSchemaName, callback) => {
if (filterSchemaName === '*' || filterSchemaName === schemaName) {
this._observeListeners.forEach((observedSchemaName, callback) => {
if (observedSchemaName === null || observedSchemaName === schemaName) {
callback(schemaName, stateId, nodeId);
}
});
Expand Down Expand Up @@ -206,21 +213,21 @@ class BaseStateManager {
// handle this way and the network overhead is very low for observe notifications:
// i.e. schemaName, stateId, nodeId
async observe(...args) {
let filterSchemaName;
let observedSchemaName;
let callback;

if (args.length === 1) {
filterSchemaName = '*';
observedSchemaName = null;
callback = args[0];

if (!isFunction(callback)) {
throw new Error(`[stateManager] Invalid arguments, valid signatures are "stateManager.observe(callback)" or "stateManager.observe(schemaName, callback)"`);
}
} else if (args.length === 2) {
filterSchemaName = args[0];
observedSchemaName = args[0];
callback = args[1];

if (!isString(filterSchemaName) || !isFunction(callback)) {
if (!isString(observedSchemaName) || !isFunction(callback)) {
throw new Error(`[stateManager] Invalid arguments, valid signatures are "stateManager.observe(callback)" or "stateManager.observe(schemaName, callback)"`);
}
} else {
Expand All @@ -232,7 +239,7 @@ class BaseStateManager {
const reqId = this._promiseStore.add(resolve, reject, 'observe-request');
// store the callback for execution on the response. the returned Promise
// is fullfiled once callback has been executed with each existing states
this._observeRequestCallbacks.set(reqId, [callback, filterSchemaName]);
this._observeRequestCallbacks.set(reqId, [callback, observedSchemaName]);

// NOTE: do not store in `_observeListeners` yet as it can produce races, e.g.:
// cf. test `observe should properly behave in race condition`
Expand All @@ -250,7 +257,7 @@ class BaseStateManager {
// - OBSERVE_NOTIFICATION [ 'a', 1, 0 ] // this should not be executed
// - OBSERVE_RESPONSE 1 [ [ 'a', 1, 0 ] ]

this.client.transport.emit(OBSERVE_REQUEST, reqId);
this.client.transport.emit(OBSERVE_REQUEST, reqId, observedSchemaName);
});
}

Expand All @@ -263,9 +270,13 @@ class BaseStateManager {
*/
async getCollection(schemaName) {
const collection = new SharedStateCollection(this, schemaName);
await collection._init();

return collection;
try {
await collection._init();
return collection;
} catch(err) {
throw new Error(`Cannot create collection, schema "${schemaName}" does not exists`);
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const DETACH_ERROR = 's:dt:err';

export const OBSERVE_REQUEST = 's:o:req';
export const OBSERVE_RESPONSE = 's:o:res';
export const OBSERVE_ERROR = 's:o:err';
export const OBSERVE_NOTIFICATION = 's:o:not';

export const UNOBSERVE_NOTIFICATION = 's:uo:not';
Expand All @@ -47,5 +48,5 @@ export const CLIENT_HANDSHAKE_RESPONSE = 'cl:h:res';
export const CLIENT_HANDSHAKE_ERROR = 'cl:h:err';

// audit state schema name
export const AUDIT_STATE_NAME = 's:c:audit';
export const AUDIT_STATE_NAME = 'p:s:audit';
export const PRIVATE_STATES = [AUDIT_STATE_NAME];
36 changes: 21 additions & 15 deletions src/server/StateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ATTACH_ERROR,
OBSERVE_REQUEST,
OBSERVE_RESPONSE,
OBSERVE_ERROR,
OBSERVE_NOTIFICATION,
UNOBSERVE_NOTIFICATION,
DELETE_SCHEMA,
Expand Down Expand Up @@ -428,23 +429,28 @@ class StateManager extends BaseStateManager {
// ---------------------------------------------
// OBSERVE PEERS (be notified when a state is created, lazy)
// ---------------------------------------------
client.transport.addListener(OBSERVE_REQUEST, reqId => {
const statesInfos = [];

this._serverStatesById.forEach(state => {
const { schemaName, id, _creatorId } = state;
// only track application states
// (e.g. do not propagate infos about audit state)
if (!PRIVATE_STATES.includes(schemaName) && _creatorId !== client.id) {
statesInfos.push([schemaName, id, _creatorId]);
}
});
client.transport.addListener(OBSERVE_REQUEST, (reqId, observedSchemaName) => {
if (observedSchemaName === null || this._schemas.has(observedSchemaName)) {
const statesInfos = [];

this._serverStatesById.forEach(state => {
const { schemaName, id, _creatorId } = state;
// only track application states
// (e.g. do not propagate infos about audit state)
if (!PRIVATE_STATES.includes(schemaName) && _creatorId !== client.id) {
statesInfos.push([schemaName, id, _creatorId]);
}
});

// add client to observers first because if some (sync) server side
// callback throws, the client would never be added to the list
this._observers.add(client);
// add client to observers first because if some (sync) server side
// callback throws, the client would never be added to the list
this._observers.add(client);

client.transport.emit(OBSERVE_RESPONSE, reqId, ...statesInfos);
client.transport.emit(OBSERVE_RESPONSE, reqId, ...statesInfos);
} else {
const msg = `Cannot observe, schema "${observedSchemaName}" does not exists`;
client.transport.emit(OBSERVE_ERROR, reqId, msg);
}
});

client.transport.addListener(UNOBSERVE_NOTIFICATION, () => {
Expand Down
6 changes: 3 additions & 3 deletions tests/states/SharedState.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ describe('# SharedState', () => {
});

describe(`## Race conditions`, () => {
it(`[FIXME #73] should not stuck the program`, async () => {
it(`should flush pending requests when state is deleted / detached`, async () => {
const aCreated = await server.stateManager.create('a');
const aAttached = await client.stateManager.attach('a');

// DELETE_REQUEST is received first on the SharedStatePrivate which deletes
// all its listeners.
// Concurrently DETACH_REQUEST is sent but have not response, request is flush when
// DELETE_NOTIFICATION or DETACH_NOTIFICATION is received
// Concurrently DETACH_REQUEST is sent but cannot have a response,
// flush pending requests when DELETE_NOTIFICATION is received
aCreated.delete();

let errored = false;
Expand Down
15 changes: 15 additions & 0 deletions tests/states/StateCollection.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ describe(`# SharedStateCollection`, () => {
await state.delete();
await delay(50);
});

it('should thow if collection, i.e. schemaName, does not exists', async () => {
let errored = false;

try {
const collection = await server.stateManager.getCollection('do-not-exists');
} catch (err) {
console.log(err.message);
errored = true;
}

if (!errored) {
assert.fail('should have failed');
}
});
});

describe(`## size (alias length)`, () => {
Expand Down
21 changes: 19 additions & 2 deletions tests/states/StateManager.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ describe(`# StateManager`, () => {
});
});

describe('## observe([schemaName,] callback) => Promise<unobserve>', async () => {
describe('## observe(callback) => Promise<unobserve>', async () => {
it(`should be notified of states created on the network`, async () => {
let numCalled = 0;

Expand Down Expand Up @@ -485,8 +485,10 @@ describe(`# StateManager`, () => {
assert.equal(responsesReceived, 2); // for each observer
assert.equal(notificationsReceived, 1); // only for first observer
});
});

it(`should properly behave with filtered schema name: observe(schemaName, callback)`, async () => {
describe('## observe(schemaName, callback) => Promise<unobserve>', async () => {
it(`should properly behave`, async () => {
const a1 = await client.stateManager.create('a');
const b1 = await client.stateManager.create('b');

Expand Down Expand Up @@ -538,6 +540,21 @@ describe(`# StateManager`, () => {
await b1.delete();
await b2.delete();
});

it(`should thow if given schema name does not exists`, async () => {
let errored = false;

try {
await client.stateManager.observe('do-not-exist', () => {});
} catch (err) {
console.log(err.message);
errored = true;
}

if (!errored) {
assert.fail('should have thrown');
}
});
});

describe('## registerUpdateHook(schemaName, updateHook)', () => {
Expand Down

0 comments on commit 251ea94

Please sign in to comment.