Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async resource #1

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
always-auth = true
registry = https://verdaccio.prod-tools.qualia.io/
14 changes: 3 additions & 11 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@
['OS == "linux"',
{
'cflags_c': [ '-std=gnu11' ],
'variables': {
'USE_MUSL': '<!(ldd --version 2>&1 | head -n1 | grep "musl" | wc -l)',
},
'conditions': [
['<(USE_MUSL) == 1',
{'defines': ['CORO_ASM', '__MUSL__']},
{'defines': ['CORO_UCONTEXT']}
],
],
'defines': ['CORO_PTHREAD'],
},
],
['OS == "solaris" or OS == "sunos" or OS == "freebsd" or OS == "aix"', {'defines': ['CORO_UCONTEXT']}],
Expand All @@ -65,8 +57,8 @@
['target_arch == "arm64"',
{
# There's been problems getting real fibers working on arm
'defines': ['CORO_UCONTEXT', '_XOPEN_SOURCE'],
'defines!': ['CORO_PTHREAD', 'CORO_SJLJ', 'CORO_ASM'],
'defines': ['CORO_PTHREAD'],
'defines!': ['CORO_UCONTEXT', 'CORO_SJLJ', 'CORO_ASM'],
},
],
],
Expand Down
133 changes: 1 addition & 132 deletions fibers.js
Original file line number Diff line number Diff line change
@@ -1,132 +1 @@
if (process.fiberLib) {
module.exports = process.fiberLib;
} else {
var fs = require('fs'), path = require('path'), detectLibc = require('detect-libc');

// Seed random numbers [gh-82]
Math.random();

// Look for binary for this platform
var modPath = path.join(__dirname, 'bin', process.platform+ '-'+ process.arch+ '-'+ process.versions.modules+
((process.platform === 'linux') ? '-'+ detectLibc.family : ''), 'fibers');
try {
// Pull in fibers implementation
process.fiberLib = module.exports = require(modPath).Fiber;
} catch (ex) {
// No binary!
console.error(
'## There is an issue with `node-fibers` ##\n'+
'`'+ modPath+ '.node` is missing.\n\n'+
'Try running this to fix the issue: '+ process.execPath+ ' '+ __dirname.replace(' ', '\\ ')+ '/build'
);
console.error(ex.stack || ex.message || ex);
throw new Error('Missing binary. See message above.');
}

setupAsyncHacks(module.exports);
}

function setupAsyncHacks(Fiber) {
// Older (or newer?) versions of node may not support this API
try {
var aw = process.binding('async_wrap');
var getAsyncIdStackSize;

if (aw.asyncIdStackSize instanceof Function) {
getAsyncIdStackSize = aw.asyncIdStackSize;
} else if (aw.constants.kStackLength !== undefined) {
getAsyncIdStackSize = function(kStackLength) {
return function() {
return aw.async_hook_fields[kStackLength];
};
}(aw.constants.kStackLength);
} else {
throw new Error('Couldn\'t figure out how to get async stack size');
}

var popAsyncContext = aw.popAsyncContext || aw.popAsyncIds;
var pushAsyncContext = aw.pushAsyncContext || aw.pushAsyncIds;
if (!popAsyncContext || !pushAsyncContext) {
throw new Error('Push/pop do not exist');
}

var kExecutionAsyncId;
if (aw.constants.kExecutionAsyncId === undefined) {
kExecutionAsyncId = aw.constants.kCurrentAsyncId;
} else {
kExecutionAsyncId = aw.constants.kExecutionAsyncId;
}
var kTriggerAsyncId;
if (aw.constants.kTriggerAsyncId === undefined) {
kTriggerAsyncId = aw.constants.kCurrentTriggerId;
} else {
kTriggerAsyncId = aw.constants.kTriggerAsyncId;
}

var asyncIds = aw.async_id_fields || aw.async_uid_fields;

function getAndClearStack() {
var ii = getAsyncIdStackSize();
var stack = new Array(ii);
for (; ii > 0; --ii) {
var asyncId = asyncIds[kExecutionAsyncId];
stack[ii - 1] = {
asyncId: asyncId,
triggerId: asyncIds[kTriggerAsyncId],
};
popAsyncContext(asyncId);
}
return stack;
}

function restoreStack(stack) {
for (var ii = 0; ii < stack.length; ++ii) {
pushAsyncContext(stack[ii].asyncId, stack[ii].triggerId);
}
}

function logUsingFibers(fibersMethod) {
const logUseFibersLevel = +(process.env.ENABLE_LOG_USE_FIBERS || 0);

if (!logUseFibersLevel) return;

if (logUseFibersLevel === 1) {
console.warn(`[FIBERS_LOG] Using ${fibersMethod}.`);
return;
}

const { LOG_USE_FIBERS_INCLUDE_IN_PATH } = process.env;
const stackFromError = new Error(`[FIBERS_LOG] Using ${fibersMethod}.`).stack;

if (
!LOG_USE_FIBERS_INCLUDE_IN_PATH ||
stackFromError.includes(LOG_USE_FIBERS_INCLUDE_IN_PATH)
) {
console.warn(stackFromError);
}
}

function wrapFunction(fn, fibersMethod) {
return function () {
logUsingFibers(fibersMethod);
var stack = getAndClearStack();
try {
return fn.apply(this, arguments);
} finally {
restoreStack(stack);
}
};
}

// Monkey patch methods which may long jump
Fiber.yield = wrapFunction(Fiber.yield, "Fiber.yield");
Fiber.prototype.run = wrapFunction(Fiber.prototype.run, "Fiber.run");
Fiber.prototype.throwInto = wrapFunction(
Fiber.prototype.throwInto,
"Fiber.throwInto"
);

} catch (err) {
return;
}
}
module.exports = require('./fibers_async.js');
40 changes: 40 additions & 0 deletions fibers_async.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const { AsyncResource } = require('async_hooks');
const _Fiber = require('./fibers_sync.js');
if (_Fiber.Fiber) {
// if we were to mix'n'match import/require - we could end up with multiple copies of this
// it might also relate to weirdness of importing it from inside the shell/debugger
// it almost looks like the --preserve-symlinks is being ignored in the debugger/shell
module.exports = _Fiber.Fiber;
}
const asyncResourceWeakMap = new WeakMap();
function Fiber(fn, ...args) {
const ar = new AsyncResource('Fiber');
const actualFn = (...args1) => ar.runInAsyncScope(() => {
Fiber.current._meteor_dynamics = undefined;
fn(...args1);
});
const _fiber = _Fiber(actualFn, ...args);
asyncResourceWeakMap.set(_fiber, ar);
return _fiber;
};

Fiber.__proto__ = _Fiber;
Fiber.prototype = _Fiber.prototype;

Object.defineProperty(Fiber, 'current', {
get() {
return _Fiber.current;
}
})


_Fiber.prototype.runInAsyncScope = function runInAsyncScope(fn) {
return asyncResourceWeakMap.get(this).runInAsyncScope(fn);
};

_Fiber[Symbol.hasInstance] = function(obj) {
// hacky
return obj instanceof Fiber || obj.run;
};

module.exports = _Fiber.Fiber = Fiber;
135 changes: 135 additions & 0 deletions fibers_sync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
if (process.fiberLib) {
module.exports = process.fiberLib;
} else {
var fs = require('fs'), path = require('path'), detectLibc = require('detect-libc');

// Seed random numbers [gh-82]
Math.random();

// Look for binary for this platform
var modPath = path.join(__dirname, 'bin', process.platform+ '-'+ process.arch+ '-'+ process.versions.modules+
((process.platform === 'linux') ? '-'+ detectLibc.family : ''), 'fibers');
try {
// Pull in fibers implementation
process.fiberLib = module.exports = require(modPath).Fiber;
} catch (ex) {
// No binary!
console.error(
'## There is an issue with `node-fibers` ##\n'+
'`'+ modPath+ '.node` is missing.\n\n'+
'Try running this to fix the issue: '+ process.execPath+ ' '+ __dirname.replace(' ', '\\ ')+ '/build'
);
console.error(ex.stack || ex.message || ex);
throw new Error('Missing binary. See message above.');
}

setupAsyncHacks(module.exports);
}

function setupAsyncHacks(Fiber) {
// Older (or newer?) versions of node may not support this API
try {
var aw = process.binding('async_wrap');
var ah = require('async_hooks');
var getAsyncIdStackSize;

if (aw.asyncIdStackSize instanceof Function) {
getAsyncIdStackSize = aw.asyncIdStackSize;
} else if (aw.constants.kStackLength !== undefined) {
getAsyncIdStackSize = function(kStackLength) {
return function() {
return aw.async_hook_fields[kStackLength];
};
}(aw.constants.kStackLength);
} else {
throw new Error('Couldn\'t figure out how to get async stack size');
}

var popAsyncContext = aw.popAsyncContext || aw.popAsyncIds;
var pushAsyncContext = aw.pushAsyncContext || aw.pushAsyncIds;
if (!popAsyncContext || !pushAsyncContext) {
throw new Error('Push/pop do not exist');
}

var kExecutionAsyncId;
if (aw.constants.kExecutionAsyncId === undefined) {
kExecutionAsyncId = aw.constants.kCurrentAsyncId;
} else {
kExecutionAsyncId = aw.constants.kExecutionAsyncId;
}
var kTriggerAsyncId;
if (aw.constants.kTriggerAsyncId === undefined) {
kTriggerAsyncId = aw.constants.kCurrentTriggerId;
} else {
kTriggerAsyncId = aw.constants.kTriggerAsyncId;
}

var asyncIds = aw.async_id_fields || aw.async_uid_fields;

function getAndClearStack() {
var ii = getAsyncIdStackSize();
var stack = new Array(ii);
for (; ii > 0; --ii) {
var asyncId = asyncIds[kExecutionAsyncId];
stack[ii - 1] = {
asyncResource: ah.executionAsyncResource(),
asyncId: asyncId,
triggerId: asyncIds[kTriggerAsyncId],
};
popAsyncContext(asyncId);
}
return stack;
}

function restoreStack(stack) {
for (var ii = 0; ii < stack.length; ++ii) {
pushAsyncContext(stack[ii].asyncId, stack[ii].triggerId);
aw.execution_async_resources.push(stack[ii].asyncResource);
}
}

const logUseFibersLevel = +(process.env.ENABLE_LOG_USE_FIBERS || 0);
const { LOG_USE_FIBERS_INCLUDE_IN_PATH } = process.env;
function logUsingFibers(fibersMethod) {

if (!logUseFibersLevel) return;

if (logUseFibersLevel === 1) {
console.warn(`[FIBERS_LOG] Using ${fibersMethod}.`);
return;
}

const stackFromError = new Error(`[FIBERS_LOG] Using ${fibersMethod}.`).stack;

if (
!LOG_USE_FIBERS_INCLUDE_IN_PATH ||
stackFromError.includes(LOG_USE_FIBERS_INCLUDE_IN_PATH)
) {
console.warn(stackFromError);
}
}

function wrapFunction(fn, fibersMethod) {
return function () {
logUsingFibers(fibersMethod);
var stack = getAndClearStack();
try {
return fn.apply(this, arguments);
} finally {
restoreStack(stack);
}
};
}

// Monkey patch methods which may long jump
Fiber.yield = wrapFunction(Fiber.yield, "Fiber.yield");
Fiber.prototype.run = wrapFunction(Fiber.prototype.run, "Fiber.run");
Fiber.prototype.throwInto = wrapFunction(
Fiber.prototype.throwInto,
"Fiber.throwInto"
);

} catch (err) {
return;
}
}
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "fibers",
"version": "5.0.2",
"version": "5.0.2-7",
"description": "Cooperative multi-tasking for Javascript",
"keywords": [
"fiber",
Expand All @@ -15,7 +15,7 @@
],
"homepage": "https://github.com/laverdet/node-fibers",
"author": "Marcel Laverdet <[email protected]> (https://github.com/laverdet/)",
"main": "fibers",
"main": "fibers_async",
"scripts": {
"install": "node build.js || nodejs build.js",
"test": "node test.js || nodejs test.js"
Expand Down
2 changes: 1 addition & 1 deletion src/coroutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void Coroutine::init(v8::Isolate* isolate) {
isolate_key = v8::internal::Isolate::isolate_key_;
thread_data_key = v8::internal::Isolate::per_isolate_thread_data_key_;
thread_id_key = v8::internal::Isolate::thread_id_key_;
#else
#elif !defined(CORO_PTHREAD)
pthread_t thread;
pthread_create(&thread, NULL, find_thread_id_key, isolate);
pthread_join(thread, NULL);
Expand Down