Skip to content

Commit

Permalink
feat(sqlite): drop support for wof bundle
Browse files Browse the repository at this point in the history
BREAKING CHANGE: drop support for wof bundles
BREAKING CHANGE: `imports.whosonfirst.sqlite` default to `true`

fixes #496
fixes #226
closes #460
  • Loading branch information
Joxit committed Apr 21, 2020
1 parent a8a7a9b commit d7f5b51
Show file tree
Hide file tree
Showing 24 changed files with 89 additions and 1,328 deletions.
28 changes: 12 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ The following configuration options are supported by this importer.

Full path to where Who's on First data is located (note: the included [downloader script](#downloading-the-data) will automatically place the WOF data here, and is the recommended way to obtain WOF data)

### `imports.whosonfirst.countries`

* Required: no
* Default: ``

Set countries sqlites to download. Geocode Earth provides two types of SQLite extracts:
- [combined](https://geocode.earth/data/whosonfirst/combined): databases of the whole planet for `Administrative Boundaries`, `Postal Code` and `Constituencies`
- [single country](https://geocode.earth/data/whosonfirst): per country databases for `Administrative Boundaries`, `Postal Code` and `Constituencies`

### `imports.whosonfirst.importPlace`

* Required: no
Expand Down Expand Up @@ -67,15 +76,6 @@ It is currently [not recommended to import venues](https://github.com/pelias/who

Set to true to enable importing postalcode records. There are over 3 million postal code records.

### `imports.whosonfirst.missingFilesAreFatal`

* Required: no
* Default: `false`

Set to `true` for missing files from [Who's on First bundles](https://dist.whosonfirst.org/bundles/) to stop the import process.

This flag is useful if you consider it vital that all Who's on First data is successfully imported, and can be helpful to guard against incomplete downloads or other types of failure.

### `imports.whosonfirst.maxDownloads`

* Required: no
Expand All @@ -86,25 +86,21 @@ The maximum number of files to download simultaneously. Higher values can be fas
### `imports.whosonfirst.dataHost`

* Required: no
* Default: `https://dist.whosonfirst.org/`
* Default: `https://data.geocode.earth/wof/dist`

The location to download Who's on First data from. Changing this can be useful to use custom data, pin data to a specific date, etc.

### `imports.whosonfirst.sqlite`

* Required: no
* Default: `false`
* Default: `true`

Set to `true` to use Who's on First SQLite databases instead of GeoJSON bundles.

SQLite databases take up less space on disk and can be much more efficient to
download and extract.

This option may [become the default in the near future](https://github.com/pelias/whosonfirst/issues/460).

However, both the Who's on First processes to generate
these files and the Pelias code to use them is new and not yet considered
production ready.
This option [is the default](https://github.com/pelias/whosonfirst/issues/460).

## Downloading the Data

Expand Down
2 changes: 1 addition & 1 deletion bin/download
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

exec node ./utils/download_data.js
exec node ./utils/download_sqlite_all.js
7 changes: 2 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@
"async": "^3.0.1",
"better-sqlite3": "^6.0.0",
"combined-stream": "^1.0.5",
"command-exists": "^1.2.8",
"csv-stream": "^0.2.0",
"command-exists": "^1.2.9",
"download-file-sync": "^1.0.4",
"fs-extra": "^8.0.0",
"iso3166-1": "^0.5.0",
"klaw-sync": "^6.0.0",
"lodash": "^4.5.1",
"parallel-transform": "^1.1.0",
"pelias-blacklist-stream": "^1.0.0",
"pelias-config": "^4.9.0",
"pelias-config": "^4.9.1",
"pelias-dbclient": "^2.13.0",
"pelias-logger": "^1.2.1",
"pelias-model": "^7.1.0",
Expand Down
2 changes: 1 addition & 1 deletion schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = Joi.object().keys({
importIntersections: Joi.boolean().default(false).truthy('yes').falsy('no'),
missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no'),
maxDownloads: Joi.number().integer(),
sqlite: Joi.boolean().default(false).truthy('yes').falsy('no')
sqlite: Joi.boolean().default(true).truthy('yes').falsy('no')
}).unknown(false)
}).unknown(true)
}).unknown(true);
97 changes: 1 addition & 96 deletions src/bundleList.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
const readline = require('readline');
const fs = require('fs-extra');
const path = require('path');
const downloadFileSync = require('download-file-sync');
const _ = require('lodash');
const klawSync = require('klaw-sync');

const peliasConfig = require( 'pelias-config' ).generate(require('../schema'));

Expand Down Expand Up @@ -59,69 +56,6 @@ function getPlacetypes() {
return roles;
}

function ensureBundleIndexExists(metaDataPath) {
const wofDataHost = peliasConfig.get('imports.whosonfirst.dataHost') || 'https://dist.whosonfirst.org';
const bundleIndexFile = path.join(metaDataPath, 'whosonfirst_bundle_index.txt');
const bundleIndexUrl = `${wofDataHost}/bundles/index.txt`;

//ensure required directory structure exists
fs.ensureDirSync(metaDataPath);

if (!fs.existsSync(bundleIndexFile)) {

const klawOptions = {
nodir: true,
filter: (f) => (f.path.indexOf('-latest.csv') !== -1)
};
const metaFiles = _.map(klawSync(metaDataPath, klawOptions),
(f) => (path.basename(f.path)));

// if there are no existing meta files and the bundle index file is not found,
// download bundle index
if (_.isEmpty(metaFiles)) {
fs.writeFileSync(bundleIndexFile, downloadFileSync(bundleIndexUrl));
}
else {
fs.writeFileSync(bundleIndexFile, metaFiles.join('\n'));
}
}
}

function getBundleList(callback) {
const metaDataPath = path.join(peliasConfig.imports.whosonfirst.datapath, 'meta');
const bundleIndexFile = path.join(metaDataPath, 'whosonfirst_bundle_index.txt');

ensureBundleIndexExists(metaDataPath);

const roles = getPlacetypes();

// the order in which the bundles are listed is critical to the correct execution
// of the admin hierarchy lookup code in whosonfirst importer,
// so in order to preserve the order specified by the roles list
// we must collect the bundles from the index files by buckets
// and then at the end merge all the buckets into a single ordered array
const bundleBuckets = initBundleBuckets(roles);

const rl = readline.createInterface({
input: fs.createReadStream(bundleIndexFile)
});

rl.on('line', (line) => {

const parts = line.split(' ');
const record = parts[parts.length - 1];

sortBundleByBuckets(roles, record, bundleBuckets);

}).on('close', () => {

const bundles = _.sortedUniq(combineBundleBuckets(roles, bundleBuckets));

callback(null, bundles);

});
}

function getDBList(callback) {
const databasesPath = path.join(peliasConfig.imports.whosonfirst.datapath, 'sqlite');
//ensure required directory structure exists
Expand All @@ -138,36 +72,7 @@ function getList(callback) {
if (peliasConfig.imports.whosonfirst.sqlite) {
return getDBList(callback);
}
getBundleList(callback);
}

function initBundleBuckets(roles) {
const bundleBuckets = {};
roles.forEach( (role) => {
bundleBuckets[role] = [];
});
return bundleBuckets;
}

function sortBundleByBuckets(roles, bundle, bundleBuckets) {
roles.forEach((role) => {
// search for the occurrence of role-latest-bundle, like region-latest-bundle
// timestamped bundles should be skipped as they are of the format role-timestamp-bundle
const validBundleRegex = new RegExp(`${role}-[\\w-]*latest`);
if (validBundleRegex.test( bundle ) ) {
bundleBuckets[role].push(bundle);
}
});
}

function combineBundleBuckets(roles, bundleBuckets) {
let bundles = [];

roles.forEach( (role) => {
bundles = _.concat(bundles, _.get(bundleBuckets, role, []));
});

return bundles;
callback('Bundles no more supported');
}

module.exports.getPlacetypes = getPlacetypes;
Expand Down
47 changes: 0 additions & 47 deletions src/components/loadJSON.js

This file was deleted.

10 changes: 0 additions & 10 deletions src/components/metadataStream.js

This file was deleted.

16 changes: 0 additions & 16 deletions src/components/parseMetaFiles.js

This file was deleted.

72 changes: 10 additions & 62 deletions src/readStream.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,17 @@
var combinedStream = require('combined-stream');
var fs = require('fs');
var through2 = require('through2');
var path = require('path');
const combinedStream = require('combined-stream');
const through2 = require('through2');
const path = require('path');

const logger = require( 'pelias-logger' ).get( 'whosonfirst' );

const getPlacetypes = require('./bundleList').getPlacetypes;
const parseMetaFiles = require('./components/parseMetaFiles');
const isNotNullIslandRelated = require('./components/isNotNullIslandRelated');
const loadJSON = require('./components/loadJSON');
const recordHasIdAndProperties = require('./components/recordHasIdAndProperties');
const isActiveRecord = require('./components/isActiveRecord');
const extractFields = require('./components/extractFields');
const recordHasName = require('./components/recordHasName');
const SQLiteStream = require('./components/sqliteStream');
const toJSONStream = require('./components/toJSONStream');

/*
* Convert a base directory and list of types into a list of meta file paths
*/
function getMetaFilePaths(wofRoot, bundles) {
return bundles.map((bundle) => {
return path.join(wofRoot, 'meta', bundle);
});
}

/*
* Convert a base directory and list of databases names into a list of sqlite file paths
*/
Expand All @@ -34,45 +21,12 @@ function getSqliteFilePaths(wofRoot, databases) {
});
}

/*
* Given the path to a meta CSV file, return a stream of the individual records
* within that CSV file.
*/
function createOneMetaRecordStream(metaFilePath) {
// All of these arguments are optional.
const options = {
escapeChar : '"', // default is an empty string
enclosedChar : '"' // default is an empty string
};

return fs.createReadStream(metaFilePath)
.pipe(parseMetaFiles.create());
}

/*
* given a list of meta file paths, create a combined stream that reads all the
* records via the csv parser
*/
function createMetaRecordStream(metaFilePaths, types) {
const metaRecordStream = combinedStream.create();

metaFilePaths.forEach((metaFilePath) => {
metaRecordStream.append( (next) => {
logger.info( `Loading ${path.basename(metaFilePath)} records from ${path.dirname(metaFilePath)}` );
next(createOneMetaRecordStream(metaFilePath));
});
});

return metaRecordStream;
}

/*
* given a list of databases file paths, create a combined stream that reads all the
* records via the SQLite reader stream
*/
function createSQLiteRecordStream(dbPaths, importPlace) {
const sqliteStream = combinedStream.create();

dbPaths.forEach((dbPath) => {
getPlacetypes().forEach(placetype => {
sqliteStream.append( (next) => {
Expand All @@ -89,24 +43,18 @@ function createSQLiteRecordStream(dbPaths, importPlace) {
}

/*
This function creates a stream that processes files in `meta/`:
CSV parses them, extracts the required fields, stores only admin records for
later, and passes all records on for further processing
This function creates a stream that processes files in `sqlite/`:
It will load all geojson in all sqlite in the folder
*/
function createReadStream(wofConfig, types, wofAdminRecords) {
const wofRoot = wofConfig.datapath;
const metaFilePaths = getMetaFilePaths(wofRoot, types);

// Select correct stream between meta and SQLite based on config and do specialized stuff
const stream = wofConfig.sqlite === true ?
createSQLiteRecordStream(getSqliteFilePaths(wofRoot, types), wofConfig.importPlace)
.pipe(toJSONStream.create()) :
createMetaRecordStream(metaFilePaths, types)
.pipe(isNotNullIslandRelated.create())
.pipe(loadJSON.create(wofRoot, wofConfig.missingFilesAreFatal));
if (wofConfig.sqlite !== true) {
throw new Error('Bundles are no more supported!');
}

// All the pipeline is the same for both meta and SQLite streams
return stream
return createSQLiteRecordStream(getSqliteFilePaths(wofRoot, types), wofConfig.importPlace)
.pipe(toJSONStream.create())
.pipe(recordHasIdAndProperties.create())
.pipe(isActiveRecord.create())
.pipe(extractFields.create())
Expand Down
Loading

0 comments on commit d7f5b51

Please sign in to comment.