Skip to content

Commit

Permalink
Merge branch 'develop' for 2.2.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris committed Aug 22, 2023
2 parents b3da2ca + c02ef78 commit 31a0ee2
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 21 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
2023-08-22 (2.2.0)
--------------------

- Refactor to process replication entries differently depending on those that are missing, broken or need touching (update expiry)
- Log the full replication identifier when replication entry fails to update
- Improve insert / update error logging

2023-02-15 (2.1.0)
--------------------

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,8 @@ Common issues when running tests:
To test a deployed node, do the following

* Modify `test/config.js` with the correct endpoint URLs
* Run `yarn test test/server.js`
* Run `yarn test test/server.js`

# Production Infrastructure Notes

When deploying behind a HTTP load balancer it is important to make sure it doesn't close the connection during a long-poll call. The server will keep pushing data through this connection, but some load balancers (eg the Google Cloud loadbalancer) will close anyway. In these case configure the load balancer with a long timeout (we recommend 3600 seconds).
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@verida/storage-node",
"version": "2.1.6",
"version": "2.2.0",
"description": "Verida Storage Node middleware that bridges decentralised identities so they can control access to databases within a CouchDB storage engine",
"main": "dist/server.js",
"scripts": {
Expand All @@ -13,6 +13,7 @@
"serve": "node --trace-warnings dist/server.js",
"build-docker-multiplatform-dev": "yarn build && docker buildx build --platform linux/amd64,linux/arm64 --push -t verida/storage-node:dev .",
"build-docker-multiplatform-prod": "yarn build && docker buildx build --platform linux/amd64,linux/arm64 --push -t verida/storage-node:latest .",
"build-docker-multiplatform-debug": "yarn build && docker buildx build --platform linux/amd64,linux/arm64 --push -t verida/storage-node:debug .",
"build-docker-amd64-prod": "yarn build && docker buildx build --platform linux/amd64 --push -t verida/storage-node:latest ."
},
"files": [
Expand Down
18 changes: 11 additions & 7 deletions src/components/dbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,20 @@ class DbManager {
doc = await db.get(id);
} catch (err) {
if (err.reason != "missing" && err.reason != 'deleted') {
throw err;
throw new Error(`Unexpected error fetching record ${id}: ${err.message}`);
}
}

if (doc._rev) {
newDoc._rev = doc._rev;
newDoc._id = id;
return await db.insert(newDoc);
} else {
return await db.insert(newDoc, id);
try {
if (doc._rev) {
newDoc._rev = doc._rev;
newDoc._id = id;
return await db.insert(newDoc);
} else {
return await db.insert(newDoc, id);
}
} catch (err) {
throw new Error(`Unexpected error inserting / updating (${id} / ${doc._rev}): ${err.message}`)
}
}

Expand Down
64 changes: 52 additions & 12 deletions src/components/replicationManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ class ReplicationManager {
const endpointUri = endpoints[e].origin
const replicatorId = Utils.generateReplicatorHash(endpointUri, did, contextName)

// Replication entries that need their expiry updated
const touchReplicationEntries = []

// Find all entries that have an issue
// Replication entries that need creating
const createReplicationEntries = []
// Replication entries that are broken
const brokenReplicationEntries = []
let authError = false

// Find all entries that have an issue
for (let d in databaseHashes) {
const dbHash = databaseHashes[d]

Expand All @@ -41,18 +45,19 @@ class ReplicationManager {

// Handle replication errors
if (!replicationStatus) {
//console.error(`${Utils.serverUri()}: ${dbHash} missing replication to ${endpointUri}`)
//console.error(`${Utils.serverUri()}: ${replicatorId}-${dbHash} missing replication to ${endpointUri}; adding to create list`)
// Replication entry not found... Will need to create it
touchReplicationEntries.push(dbHash)
createReplicationEntries.push(dbHash)
} else if (replicationStatus.state == 'failed' || replicationStatus.state == 'crashing' || replicationStatus.state == 'error') {
console.error(`${Utils.serverUri()}: ${dbHash} has invalid state (${replicationStatus.state}) replicating to ${endpointUri}`)
console.error(`${Utils.serverUri()}: ${replicatorId}-${dbHash} has invalid state (${replicationStatus.state}) replicating to ${endpointUri}; deleting and adding to create list`)
brokenReplicationEntries.push(replicationStatus)
touchReplicationEntries.push(dbHash)
createReplicationEntries.push(dbHash)
if (replicationStatus.state == 'crashing' && replicationStatus.info.error.match(/replication_auth_error/)) {
authError = true
}
} else {
// Replication is good, but need to update the touched timestamp
//console.log(`${Utils.serverUri()}: ${replicatorId}-${dbHash} has no replication errors, will touch expiry`)
touchReplicationEntries.push(dbHash)
}
} catch (err) {
Expand All @@ -63,10 +68,11 @@ class ReplicationManager {
// Delete broken replication entries
const couch = Db.getCouch('internal')
const replicationDb = couch.db.use('_replicator')

// @todo: No need as they will be garbage collected?
for (let b in brokenReplicationEntries) {
const replicationEntry = brokenReplicationEntries[b]
console.log(`${Utils.serverUri()}: Replication has issues, deleting entry: ${replicationEntry.doc_id} (${replicationEntry.state})`)
console.warn(`${Utils.serverUri()}: Replication has issues, deleting entry: ${replicationEntry.doc_id} (${replicationEntry.state})`)

try {
const replicationRecord = await replicationDb.get(replicationEntry.doc_id)
Expand All @@ -76,9 +82,37 @@ class ReplicationManager {
}
}

// Create or update all replication entries for the list of database hashes
// Create or update all broken replication entries
if (Object.keys(createReplicationEntries).length > 0) {
await this.createUpdateReplicationEntries(did, contextName, endpointUri, createReplicationEntries, authError)
}

// Touch (update expiry attribute) on
if (Object.keys(touchReplicationEntries).length > 0) {
await this.createUpdateReplicationEntries(did, contextName, endpointUri, touchReplicationEntries, authError)
await this.touchReplicationEntries(did, contextName, endpointUri, touchReplicationEntries)
}
}
}

async touchReplicationEntries(did, contextName, endpointUri, dbHashes) {
const couch = Db.getCouch('internal')
const replicationDb = couch.db.use('_replicator')
const replicatorId = Utils.generateReplicatorHash(endpointUri, did, contextName)

for (let d in dbHashes) {
const dbHash = dbHashes[d]

try {
const doc = await replicationDb.get(`${replicatorId}-${dbHash}`);
if (!doc) {
console.error(`${Utils.serverUri()}: Attempting to touched replication entry that doesn't exist: ${endpointUri} (${replicatorId}-${dbHash})`)
continue;
}
doc.expiry = (now() + process.env.REPLICATION_EXPIRY_MINUTES*60)
const result = await DbManager._insertOrUpdate(replicationDb, doc, doc._id)
//console.log(`${Utils.serverUri()}: Touched replication entry for ${endpointUri} (${replicatorId}-${dbHash})`)
} catch (err) {
console.error(`${Utils.serverUri()}: Error touching replication entry for ${endpointUri} (${replicatorId}-${dbHash}): ${err.message}`)
}
}
}
Expand Down Expand Up @@ -129,10 +163,16 @@ class ReplicationManager {
try {
const result = await DbManager._insertOrUpdate(replicationDb, replicationRecord, replicationRecord._id)
replicationRecord._rev = result.rev
//console.log(`${Utils.serverUri()}: Saved replication entry for ${endpointUri} (${replicatorId})`)
//console.log(`${Utils.serverUri()}: Saved replication entry for ${endpointUri} (${replicatorId}-${dbHash})`)
} catch (err) {
console.log(`${Utils.serverUri()}: Error saving replication entry for ${endpointUri} (${replicatorId}): ${err.message}`)
throw new Error(`Unable to create replication entry: ${err.message}`)
if (err.message.match(/update conflict/)) {
// Attempted to save a replication entry that already exists
// This can be a race condition, so we can easily ignore (since it has already been saved)
//console.log(`${Utils.serverUri()}: Skipping replication entry for ${endpointUri} (${replicatorId}-${dbHash}), already exists`)
} else {
console.error(`${Utils.serverUri()}: Error saving replication entry for ${endpointUri} (${replicatorId}-${dbHash}): ${err.message}`)
throw new Error(`Unable to create replication entry: ${err.message}`)
}
}
}
}
Expand Down

0 comments on commit 31a0ee2

Please sign in to comment.