From b9adc5e969624fc391093bcc6574e1b2504fb9d1 Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Tue, 14 Mar 2017 15:50:23 -0700 Subject: [PATCH] S3C-35 Refactoring of bucketfile (S3 file metadata backend) The changes allow bucketfile to use a new API in Arsenal to communicate with a remote leveldb database, containing sublevels for bucket storage. Metadata is still stored on a local levelDB server, but it should now be easy to move the storage logic in a new daemon running on a remote server, and it should be robust. Instead of relying on the existing implementation of multilevel, it uses client/server wrappers around a new level-net communication protocol and API in Arsenal based on socket.io to exchange messages. It shall be compatible with the existing metadata since it still uses the core sublevel module for the storage logic, only the RPC procotol has changed. Test done: - put a few 100s of files in different S3 subdirectories - list directories and subdirectories - get/delete files - multi-part upload - introduce random connection errors (tcpkill) to check robustness and automatic reconnection --- config.json | 4 + init.js | 60 +--- lib/Config.js | 14 + lib/metadata/bucketfile/backend.js | 376 ++++++---------------- mdserver.js | 14 + package.json | 11 +- tests/unit/metadata/bucketfile/backend.js | 43 ++- 7 files changed, 173 insertions(+), 349 deletions(-) create mode 100644 mdserver.js diff --git a/config.json b/config.json index 7f89bcad91..f7425d3f3f 100644 --- a/config.json +++ b/config.json @@ -36,5 +36,9 @@ }, "healthChecks": { "allowFrom": ["127.0.0.1/8", "::1"] + }, + "metadataDaemon": { + "host": "localhost", + "port": 9990 } } diff --git a/init.js b/init.js index 7abf0260c8..21e1dc83cc 100644 --- a/init.js +++ b/init.js @@ -6,61 +6,11 @@ const fs = require('fs'); const os = require('os'); const async = require('async'); -const uuid = require('node-uuid'); const constants = require('./constants').default; const config = require('./lib/Config.js').default; const logger = require('./lib/utilities/logger.js').logger; - -let ioctl; -try { - ioctl = require('ioctl'); -} catch (err) { - logger.warn('ioctl dependency is unavailable. skipping...'); -} - -function _setDirSyncFlag(path) { - const GETFLAGS = 2148034049; - const SETFLAGS = 1074292226; - const FS_DIRSYNC_FL = 65536; - const buffer = Buffer.alloc(8, 0); - const pathFD = fs.openSync(path, 'r'); - const status = ioctl(pathFD, GETFLAGS, buffer); - assert.strictEqual(status, 0); - const currentFlags = buffer.readUIntLE(0, 8); - const flags = currentFlags | FS_DIRSYNC_FL; - buffer.writeUIntLE(flags, 0, 8); - const status2 = ioctl(pathFD, SETFLAGS, buffer); - assert.strictEqual(status2, 0); - fs.closeSync(pathFD); - const pathFD2 = fs.openSync(path, 'r'); - const confirmBuffer = Buffer.alloc(8, 0); - ioctl(pathFD2, GETFLAGS, confirmBuffer); - assert.strictEqual(confirmBuffer.readUIntLE(0, 8), - currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set'); - logger.info('FS_DIRSYNC_FL set'); - fs.closeSync(pathFD2); -} - -function printUUID(metadataPath) { - const uuidFile = `${metadataPath}/uuid`; - - try { - fs.accessSync(uuidFile, fs.F_OK | fs.R_OK); - } catch (e) { - if (e.code === 'ENOENT') { - const v = uuid.v4(); - const fd = fs.openSync(uuidFile, 'w'); - fs.writeSync(fd, v.toString()); - fs.closeSync(fd); - } else { - throw e; - } - } - - const uuidValue = fs.readFileSync(uuidFile); - logger.info(`This deployment's identifier is ${uuidValue}`); -} +const storageUtils = require('arsenal').storage.utils; // If neither data nor metadata is using the file backend, // there is no need to init @@ -71,18 +21,15 @@ if (config.backends.data !== 'file' && config.backends.data !== 'multiple' && } const dataPath = config.filePaths.dataPath; -const metadataPath = config.filePaths.metadataPath; fs.accessSync(dataPath, fs.F_OK | fs.R_OK | fs.W_OK); -fs.accessSync(metadataPath, fs.F_OK | fs.R_OK | fs.W_OK); const warning = 'WARNING: Synchronization directory updates are not ' + 'supported on this platform. Newly written data could be lost ' + 'if your system crashes before the operating system is able to ' + 'write directory updates.'; -if (os.type() === 'Linux' && os.endianness() === 'LE' && ioctl) { +if (os.type() === 'Linux' && os.endianness() === 'LE') { try { - _setDirSyncFlag(dataPath); - _setDirSyncFlag(metadataPath); + storageUtils.setDirSyncFlag(dataPath); } catch (err) { logger.warn(warning, { error: err.stack }); } @@ -105,5 +52,4 @@ async.eachSeries(subDirs, (subDirName, next) => { err => { assert.strictEqual(err, null, `Error creating data files ${err}`); logger.info('Init complete. Go forth and store data.'); - printUUID(metadataPath); }); diff --git a/lib/Config.js b/lib/Config.js index c949eb34a8..41d791e94b 100644 --- a/lib/Config.js +++ b/lib/Config.js @@ -269,6 +269,20 @@ class Config { } } + if (config.metadataDaemon) { + this.metadataDaemon = {}; + assert.strictEqual( + typeof config.metadataDaemon.host, 'string', + 'bad config: metadata daemon host must be a string'); + this.metadataDaemon.host = config.metadataDaemon.host; + + assert(Number.isInteger(config.metadataDaemon.port) + && config.metadataDaemon.port > 0, + 'bad config: metadata daemon port must be a ' + + 'positive integer'); + this.metadataDaemon.port = config.metadataDaemon.port; + } + if (process.env.ENABLE_LOCAL_CACHE) { this.localCache = defaultLocalCache; } diff --git a/lib/metadata/bucketfile/backend.js b/lib/metadata/bucketfile/backend.js index 04fe5afb49..ef22017556 100644 --- a/lib/metadata/bucketfile/backend.js +++ b/lib/metadata/bucketfile/backend.js @@ -1,12 +1,5 @@ -import net from 'net'; -import fs from 'fs'; import cluster from 'cluster'; -import events from 'events'; -import assert from 'assert'; -import level from 'level'; -import multilevel from 'multilevel'; -import sublevel from 'level-sublevel'; import arsenal from 'arsenal'; import { logger } from '../../utilities/logger'; @@ -15,12 +8,8 @@ import constants from '../../../constants'; import config from '../../Config'; const errors = arsenal.errors; +const MetadataClient = arsenal.storage.metadata.client; -const METADATA_PORT = 9990; -const METADATA_PATH = `${config.filePaths.metadataPath}/`; -const MANIFEST_JSON = 'manifest.json'; -const MANIFEST_JSON_TMP = 'manifest.json.tmp'; -const ROOT_DB = 'rootDB'; const METASTORE = '__metastore'; const OPTIONS = { sync: true }; @@ -28,153 +17,36 @@ class BucketFileInterface { constructor() { this.logger = logger; + this.mdClient = new MetadataClient( + { metadataHost: 'localhost', + metadataPort: config.metadataDaemon.port, + log: config.log }); + this.mdDB = this.mdClient.openDB(); + // the metastore sublevel is used to store bucket attributes + this.metastore = this.mdDB.openSub(METASTORE); if (cluster.isMaster) { - this.startServer(); - } else { - this.refcnt = 0; - this.waitreco = 0; - this.realReConnect(); - this.notifier = new events.EventEmitter(); + this.setupMetadataServer(); } } - /** - * Start the server - * @return {undefined} - */ - startServer() { - const rootDB = level(METADATA_PATH + ROOT_DB); - const sub = sublevel(rootDB); - sub.methods = sub.methods || {}; - sub.methods.createSub = { type: 'async' }; - sub.createSub = (subName, cb) => { - try { - sub.sublevel(subName); - multilevel.writeManifest(sub, - METADATA_PATH + - MANIFEST_JSON_TMP); - fs.renameSync(METADATA_PATH + MANIFEST_JSON_TMP, - METADATA_PATH + MANIFEST_JSON); - cb(); - } catch (err) { - cb(err); - } - }; - const metastore = sub.sublevel(METASTORE); + setupMetadataServer() { /* Since the bucket creation API is expecting the usersBucket to have attributes, we pre-create the - usersBucket here */ - sub.sublevel(constants.usersBucket); + usersBucket attributes here */ + this.mdClient.logger.debug('setting up metadata server'); const usersBucketAttr = new BucketInfo(constants.usersBucket, 'admin', 'admin', new Date().toJSON(), BucketInfo.currentModelVersion()); - metastore.put(constants.usersBucket, usersBucketAttr.serialize()); - const stream = metastore.createKeyStream(); - stream - .on('data', e => { - // automatically recreate existing sublevels - sub.sublevel(e); - }) - .on('error', err => { - this.logger.fatal('error listing metastore', { error: err }); - throw (errors.InternalError); - }) - .on('end', () => { - multilevel.writeManifest(sub, METADATA_PATH + MANIFEST_JSON); - this.logger.info('starting metadata file backend server'); - /* We start a server that will server the sublevel - capable rootDB to clients */ - net.createServer(con => { - con.pipe(multilevel.server(sub)).pipe(con); - }).listen(METADATA_PORT); - }); - } - - /** - * Reconnect to the server - * @return {undefined} - */ - realReConnect() { - if (this.client !== undefined) { - this.client.close(); - } - delete require.cache[require.resolve(METADATA_PATH + MANIFEST_JSON)]; - const manifest = require(METADATA_PATH + MANIFEST_JSON); - this.client = multilevel.client(manifest); - const con = net.connect(METADATA_PORT); - con.pipe(this.client.createRpcStream()).pipe(con); - this.metastore = this.client.sublevel(METASTORE); - } - - /** - * Wait for reconnect do be done - * @param {function} cb - callback() - * @return {undefined} - */ - reConnect(cb) { - if (this.refcnt === this.waitreco) { - /* Either we are alone waiting for reconnect or all - operations are waiting for reconnect, then force - a reco and notify others */ - this.realReConnect(); - if (this.waitreco > 1) { - this.notifier.emit('recodone'); - } - return cb(); - } - /* We need to wait for somebody to wake us up either by - recodone or refdecr */ - let cbDone = false; - - if (this.notifier.listeners('recodone').length > 0 || - this.notifier.listeners('refdecr').length > 0) { - setTimeout(() => { - this.reConnect(cb); - }, 50); - return undefined; - } - this.notifier.once('recodone', () => { - if (cbDone) { - return undefined; - } - cbDone = true; - return cb(); - }); - this.notifier.once('refdecr', () => { - /* We need to recheck for the condition as - somebody might issue a command before we - get the notification */ - if (cbDone) { - return undefined; - } - cbDone = true; - process.nextTick(() => { - this.reConnect(cb); + this.metastore.put( + constants.usersBucket, + usersBucketAttr.serialize(), err => { + if (err) { + this.logger.fatal('error writing usersBucket ' + + 'attributes to metadata', + { error: err }); + throw (errors.InternalError); + } }); - return undefined; - }); - return undefined; - } - - /** - * Take a reference on the client - * @return {undefined} - */ - ref() { - this.refcnt++; - } - - /** - * Unreference the client - * @return {undefined} - */ - unRef() { - this.refcnt--; - assert(this.refcnt >= 0); - if (this.waitreco > 0) { - /* give a change to wake up waiters */ - this.notifier.emit('refdecr'); - } } /** @@ -184,121 +56,73 @@ class BucketFileInterface { * @param {function} cb - callback(err, db, attr) * @return {undefined} */ - loadDBIfExistsNoRef(bucketName, log, cb) { - this.getBucketAttributesNoRef(bucketName, log, (err, attr) => { + loadDBIfExists(bucketName, log, cb) { + this.getBucketAttributes(bucketName, log, (err, attr) => { if (err) { - return cb(err, null); + return cb(err); } - let db; try { - db = this.client.sublevel(bucketName); + const db = this.mdDB.openSub(bucketName); return cb(null, db, attr); } catch (err) { - /* if the bucket is newly created the - client cannot create sublevels without - re-reading the manifest */ - this.waitreco++; - this.reConnect(() => { - this.waitreco--; - try { - db = this.client.sublevel(bucketName); - } catch (err) { - log.error('cannot make sublevel usable', - { error: err.stack }); - return cb(errors.InternalError, null); - } - return cb(null, db, attr); - }); - return undefined; - } - }); - return undefined; - } - - loadDBIfExists(bucketName, log, cb) { - this.ref(); - this.loadDBIfExistsNoRef(bucketName, log, (err, db, attr) => { - if (err) { - this.unRef(); - return cb(err); + return cb(errors.InternalError); } - // we hold a ref here - return cb(err, db, attr); }); return undefined; } createBucket(bucketName, bucketMD, log, cb) { - this.ref(); - this.getBucketAttributesNoRef(bucketName, log, err => { + this.getBucketAttributes(bucketName, log, err => { if (err && err !== errors.NoSuchBucket) { - this.unRef(); return cb(err); } if (err === undefined) { - this.unRef(); return cb(errors.BucketAlreadyExists); } - this.client.createSub(bucketName, err => { - if (err) { - log.error('error creating sublevel', { error: err }); - this.unRef(); - return cb(errors.InternalError); - } - // we hold a ref here - this.putBucketAttributesNoRef(bucketName, - bucketMD, - log, - err => { - this.unRef(); - return cb(err); - }); - return undefined; - }); + this.putBucketAttributes(bucketName, + bucketMD, + log, cb); return undefined; }); - return undefined; } - getBucketAttributesNoRef(bucketName, log, cb) { + getBucketAttributes(bucketName, log, cb) { this.metastore.get(bucketName, (err, data) => { if (err) { if (err.notFound) { return cb(errors.NoSuchBucket); } - log.error('error getting db attributes', - { error: err }); - return cb(errors.InternalError, null); + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error('error getting db attributes', logObj); + return cb(errors.InternalError); } return cb(null, BucketInfo.deSerialize(data)); }); return undefined; } - getBucketAttributes(bucketName, log, cb) { - this.ref(); - this.getBucketAttributesNoRef(bucketName, log, - (err, data) => { - this.unRef(); - return cb(err, data); - }); - return undefined; - } - getBucketAndObject(bucketName, objName, params, log, cb) { this.loadDBIfExists(bucketName, log, (err, db, bucketAttr) => { if (err) { return cb(err); } db.get(objName, (err, objAttr) => { - this.unRef(); if (err) { if (err.notFound) { return cb(null, { bucket: bucketAttr.serialize(), }); } - log.error('error getting object', { error: err }); + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error('error getting object', logObj); return cb(errors.InternalError); } return cb(null, { @@ -311,13 +135,18 @@ class BucketFileInterface { return undefined; } - putBucketAttributesNoRef(bucketName, bucketMD, log, cb) { + putBucketAttributes(bucketName, bucketMD, log, cb) { this.metastore.put(bucketName, bucketMD.serialize(), OPTIONS, err => { if (err) { + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; log.error('error putting db attributes', - { error: err }); + logObj); return cb(errors.InternalError); } return cb(); @@ -325,25 +154,17 @@ class BucketFileInterface { return undefined; } - putBucketAttributes(bucketName, bucketMD, log, cb) { - this.ref(); - this.putBucketAttributesNoRef(bucketName, bucketMD, log, - err => { - this.unRef(); - return cb(err); - }); - return undefined; - } - deleteBucket(bucketName, log, cb) { - // we could remove bucket from manifest but it is not a problem - this.ref(); this.metastore.del(bucketName, err => { - this.unRef(); if (err) { + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; log.error('error deleting bucket', - { error: err }); + logObj); return cb(errors.InternalError); } return cb(); @@ -358,12 +179,16 @@ class BucketFileInterface { } db.put(objName, JSON.stringify(objVal), OPTIONS, err => { - this.unRef(); // TODO: implement versioning for file backend const data = undefined; if (err) { + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; log.error('error putting object', - { error: err }); + logObj); return cb(errors.InternalError); } return cb(err, data); @@ -378,13 +203,16 @@ class BucketFileInterface { return cb(err); } db.get(objName, (err, data) => { - this.unRef(); if (err) { if (err.notFound) { return cb(errors.NoSuchKey); } - log.error('error getting object', - { error: err }); + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error('error getting object', logObj); return cb(errors.InternalError); } return cb(null, JSON.parse(data)); @@ -399,10 +227,13 @@ class BucketFileInterface { return cb(err); } db.del(objName, OPTIONS, err => { - this.unRef(); if (err) { - log.error('error deleting object', - { error: err }); + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error('error deleting object', logObj); return cb(errors.InternalError); } return cb(); @@ -429,31 +260,38 @@ class BucketFileInterface { return cb(err); } let cbDone = false; - const stream = db.createReadStream(requestParams); - stream - .on('data', e => { - if (extension.filter(e) < 0) { - stream.emit('end'); - stream.destroy(); - } - }) - .on('error', err => { - if (!cbDone) { - this.unRef(); - cbDone = true; - log.error('error listing objects', - { error: err }); - cb(errors.InternalError); - } - }) - .on('end', () => { - if (!cbDone) { - this.unRef(); - cbDone = true; - const data = extension.result(); - cb(null, data); - } - }); + db.createReadStream(requestParams, (err, stream) => { + if (err) { + return cb(err); + } + stream + .on('data', e => { + if (extension.filter(e) < 0) { + stream.emit('end'); + stream.destroy(); + } + }) + .on('error', err => { + if (!cbDone) { + cbDone = true; + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error('error listing objects', logObj); + cb(errors.InternalError); + } + }) + .on('end', () => { + if (!cbDone) { + cbDone = true; + const data = extension.result(); + cb(null, data); + } + }); + return undefined; + }); return undefined; }); } diff --git a/mdserver.js b/mdserver.js new file mode 100644 index 0000000000..a0be9e13a2 --- /dev/null +++ b/mdserver.js @@ -0,0 +1,14 @@ +'use strict'; // eslint-disable-line strict +require('babel-core/register'); + +const config = require('./lib/Config.js').default; +const MetadataServer = require('arsenal').storage.metadata.server; + +if (config.backends.metadata === 'file') { + const mdServer = new MetadataServer( + { metadataPath: config.filePaths.metadataPath, + metadataPort: config.metadataDaemon.port, + log: config.log }); + mdServer.startServer(); +} + diff --git a/package.json b/package.json index 945c823074..36655b2009 100644 --- a/package.json +++ b/package.json @@ -28,10 +28,8 @@ "bucketclient": "scality/bucketclient", "commander": "^2.9.0", "ioredis": "2.4.0", - "level": "^1.4.0", - "level-sublevel": "^6.5.4", - "multilevel": "^7.3.0", "node-uuid": "^1.4.3", + "npm-run-all": "~4.0.2", "ready-set-stream": "1.0.7", "sproxydclient": "scality/sproxydclient", "utapi": "scality/utapi", @@ -40,9 +38,6 @@ "werelogs": "scality/werelogs", "xml2js": "~0.4.16" }, - "optionalDependencies": { - "ioctl": "2.0.0" - }, "devDependencies": { "aws-sdk": "2.28.0", "babel-cli": "^6.2.0", @@ -72,7 +67,9 @@ "lint_md": "mdlint $(git ls-files '*.md')", "mem_backend": "S3BACKEND=mem node index.js", "perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js", - "start": "node init.js && node index.js", + "start": "node init.js && npm-run-all --parallel start_mdserver start_s3server", + "start_mdserver": "node mdserver.js", + "start_s3server": "node index.js", "start_utapi": "node utapiServer.js", "utapi_replay": "node utapiReplay.js", "test": "S3BACKEND=mem mocha --compilers js:babel-core/register --recursive tests/unit", diff --git a/tests/unit/metadata/bucketfile/backend.js b/tests/unit/metadata/bucketfile/backend.js index 063c7dc41b..09e361b443 100644 --- a/tests/unit/metadata/bucketfile/backend.js +++ b/tests/unit/metadata/bucketfile/backend.js @@ -21,23 +21,25 @@ class Reader extends EventEmitter { super(); this.done = false; this.index = 0; - process.nextTick(() => this.start()); } start() { - if (this.done) { - return null; - } - const i = this.index++; - // extensions should take into account maxKeys - // and should not filter more than that intended value - assert(i <= MAX_KEYS, `listed more than maxKeys ${MAX_KEYS}`); - if (i === KEY_COUNT) { - return this.emit('end'); - } - this.emit('data', { key: `${zpad(i)}`, - value: `{"foo":"${i}","initiator":"${i}"}` }); - return process.nextTick(() => this.start()); + return process.nextTick(() => { + if (this.done) { + return null; + } + const i = this.index++; + // extensions should take into account maxKeys + // and should not filter more than the intended value + assert(i <= MAX_KEYS, + `listed more than maxKeys ${MAX_KEYS} (${i})`); + if (i === KEY_COUNT) { + return this.emit('end'); + } + this.emit('data', { key: `${zpad(i)}`, + value: `{"foo":"${i}","initiator":"${i}"}` }); + return this.start(); + }); } destroy() { @@ -50,13 +52,22 @@ describe('BucketFileInterface::internalListObject', alldone => { // stub db to inspect the extensions const db = { - createReadStream: () => new Reader(), + createReadStream: (params, callback) => { + const reader = new Reader(params); + if (callback) { + return process.nextTick(() => { + reader.start(); + return callback(null, reader); + }); + } + reader.start(); + return reader; + }, }; // stub functions and components const logger = { info: () => {}, debug: () => {}, error: () => {} }; bucketfile.loadDBIfExists = (bucket, log, callback) => callback(null, db); - bucketfile.unRef = () => {}; Object.keys(extensions).forEach(listingType => { it(`listing max ${MAX_KEYS} keys using ${listingType}`, done => {