diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt index b57ad51..62854d3 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt @@ -81,23 +81,37 @@ internal data class InsertReplaceChangeEvent( fun fromMongoChangeStreamDocument( changeStreamDocument: ChangeStreamDocument, - shardingKey: String? - ): InsertReplaceChangeEvent = - InsertReplaceChangeEvent( - changeStreamDocument.operationType, - upsertKey(changeStreamDocument.documentKey!!, shardingKey), + shardingKey: String?, + ): InsertReplaceChangeEvent { + val documentFromEvent = changeStreamDocument.fullDocument?.toBsonDocument( BsonDocument::class.java, - codecRegistry + codecRegistry, ) + + return InsertReplaceChangeEvent( + changeStreamDocument.operationType, + upsertKey(changeStreamDocument.documentKey!!, shardingKey, documentFromEvent), + documentFromEvent, ) + } // This addresses "Failed to target upsert by query :: could not extract exact shard key" error for sharded destination collections // https://www.mongodb.com/docs/manual/reference/method/db.collection.update/#sharded-collections - private fun upsertKey(documentKey: BsonDocument, shardingKey: String?): BsonDocument { - if (shardingKey == null) return documentKey // When no shardingKey, don't change documentKey - if (documentKey.containsKey(shardingKey)) return documentKey // When sharding key is already in documentKey, don't change documentKey - return documentKey.append(shardingKey, BsonNull.VALUE) // When there is no shardingKey, add "shardingKey: null" to documentKey + private fun upsertKey( + documentKey: BsonDocument, + shardingKey: String?, + documentFromEvent: BsonDocument?, + ): BsonDocument { + // When no shardingKey, don't change documentKey + if (shardingKey == null) return documentKey + // When sharding key is already in documentKey, don't change documentKey + if (documentKey.containsKey(shardingKey)) return documentKey + // When destination collection is sharded, and there is no shardingKey, add "shardingKey: " to documentKey + return documentKey.append( + shardingKey, + documentFromEvent?.get(shardingKey, BsonNull.VALUE) ?: BsonNull.VALUE, + ) } }