Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEW] Atomic slot migration HLD #23

Open
PingXie opened this issue Mar 25, 2024 · 47 comments
Open

[NEW] Atomic slot migration HLD #23

PingXie opened this issue Mar 25, 2024 · 47 comments
Assignees
Labels
cluster enhancement New feature or request major-decision-pending Major decision pending by TSC team

Comments

@PingXie
Copy link
Member

PingXie commented Mar 25, 2024

Last Updated: Jan 12, 2025

Problem Statement

Efficient slot migration is critical for maintaining the scalability and resilience of Valkey clusters, but the current client-driven migration approach presents significant challenges.

Operators currently need to perform multiple manual steps to migrate slots, including setting migration states, transferring data, and updating ownership, all of which are prone to errors and inefficiencies.

While existing migration methods ensure availability during the process, they rely on mechanisms like redirection (-MOVED, -ASK) to maintain client access to data. These mechanisms introduce significant client-side complexity, requiring clients to handle redirections, retries, and reconfigurations during migration. Ensuring atomicity and consistency during migrations remains a challenge, especially for operations involving multiple keys or large datasets.

Compounding these challenges, limited observability leaves operators without adequate insight into the migration process, making it difficult to monitor progress or resolve issues efficiently.

Command Definitions

CLUSTER IMPORT SLOTS

  • Syntax: CLUSTER IMPORT SLOTS <slot_start> <slot_end> [<slot_start> <slot_end>...]
  • Arguments:
    • <slot_start>: The starting slot number of a range (integer between 0 and 16383).
    • <slot_end>: The ending slot number of a range (integer between 0 and 16383 and equal to or greater than <slot_start>), inclusive.
  • Return Value:
    • <op_id> on queuing success, where <op_id> is a UUID for the long running import operation.
    • -ERR <error message> on failure (e.g., invalid slot range).
  • Description: This command asynchronously imports specified slot ranges to the target primary node, allowing background import management. Existing target node slots and slots with undetermined sources are skipped. Note that although the import ID is ephemeral and lost if the target primary is terminated, atomic slot migration ensures the cluster remains consistent, with keys in a slot always residing on a single shard.

CLUSTER IMPORT STATUS

  • Syntax: CLUSTER IMPORT STATUS OPID <id>
  • Arguments:
    • <op_id>: The unique identifier of the import operation, as returned by CLUSTER IMPORT SLOTS.
    • -ERR <error message> on failure (e.g., invalid operation ID).
  • Return Value (RESP2 and RESP3): A map with the following information:
    • Total number of slots requested for import.
    • Number of slots successfully imported.
    • Number of slots that failed to import.
    • Number of slots currently being imported.
    • Number of slots whose import was canceled.
    • ... other relevant statistics … such as total number of keys or bytes moved
  • Description: This command checks the status and progress of an import. How the information is returned depends on the client's RESP version. RESP3 clients get a map with details for each slot; RESP2 clients get a nested array with labeled data points. Server logs have more information about failed slot migrations.

CLUSTER IMPORT CANCEL

  • Syntax: CLUSTER IMPORT CANCEL OPID <id>
  • Arguments:
    • <op_id>: The unique identifier of the import operation, as returned by CLUSTER IMPORT SLOTS.
  • Return Value:
    • +OK
    • -ERR <error message> on failure (e.g., invalid operation ID).
  • Description: This command cancels queued or in-progress import operations, rolling back any in-progress slot transfers. The import session is not immediately deleted upon cancellation or completion; the server retains a FIFO queue of a certain number of completed sessions.

SYNCSLOTS (Internal)

  • Syntax: SYNCSLOTS SLOTS <slot_start> <slot_end> [<slot_start> <slot_end> ...]
  • Arguments:
    • <slot_start>: The starting slot number of a range (integer between 0 and 16383).
    • <slot_end>: The ending slot number of a range (integer between 0 and 16383).
  • Return Value:
    • <session_id> on success, where <session_id> is a UUID for the long running full sync operation.
    • -ERR <error message> on failure. Possible error messages include:
      • -ERR Slot range <slot_start>-<slot_end> is already being imported.
  • **Description: **This command is used internally by Valkey to initiate the transfer of data for specified slot ranges from a source primary node to a target primary node during a slot migration. It is not intended to be used directly by users. Upon receiving this command, the source node first checks if any of the requested slots are already involved in an ongoing import operation with a different target node. If so, the command fails with an appropriate error message, indicating the conflicting slot range.

TRANSFERSLOTS (Internal)

  • Syntax: TRANSFERSLOTS SESSIONID <session_id>
  • Arguments:
    • <session_id>: The unique identifier of the import operation, previously provided by the source node in response to a SYNCSLOTS command.
  • Return Value:
    • <tracked_changes_size> on success, where <tracked_changes_size> is the number of bytes of remaining tracked changes.
    • -ERR <error message> on failure. Possible error messages include:
      • -ERR Invalid session ID.
      • -ERR Slot transfer already in progress.
  • Description: This command is used internally by Valkey to signal the source primary node to finalize the transfer of slots that were previously requested via the SYNCSLOTS command. It is not intended to be used directly by users. The target primary node sends this command to the source node after it has finished processing the initial AOF snapshot of the slots and has sufficiently caught up with the ongoing changes.

Operator WorkFlow

Consider a Valkey cluster comprising three nodes (A, B, and C), each serving a distinct range of hash slots. To optimize resource utilization, an operator decides to migrate slots 1000-2000 from Node A to Node C. This migration is initiated by connecting to Node C via valkey-cli and issuing the command CLUSTER IMPORT SLOTS 1000 2000. Node C responds with a unique identifier for the import operation.

The operator can then periodically monitor the progress of this migration using the command CLUSTER IMPORT STATUS followed by the unique identifier. This command provides a high level slot migration status. This allows the operator to track the migration's progress and identify any potential issues.

Should the need arise to halt the migration, perhaps due to an unforeseen event or a change in operational requirements, the operator can issue the command CLUSTER IMPORT CANCEL along with the corresponding import identifier. This command signals Node C to stop the ongoing migration process.

Upon completion of the migration, the operator can verify the successful transfer of slots 1000-2000 to Node C by inspecting the cluster's slot distribution using commands like CLUSTER NODES. This confirms that the data has been correctly redistributed within the cluster as intended.

valkey-cli -h 10.0.0.3 -p 6379
> HELLO 3
> CLUSTER IMPORT SLOTS 1000 2000 
c7e98d63-5717-479f-8b8a-4c1d49720286 
> CLUSTER IMPORT STATUS OPID c7e98d63-5717-479f-8b8a-4c1d49720286 
1# "requested-slots" => (integer) 1001 
2# "completed-slots" => (integer) 800 
3# "failed-slots" => (integer) 50 
4# "importing-slots" => (integer) 15 
5# "canceled-slots" => (integer) 0 
> CLUSTER IMPORT CANCEL OPID c7e98d63-5717-479f-8b8a-4c1d49720286
 +OK 
> CLUSTER IMPORT STATUS OPID c7e98d63-5717-479f-8b8a-4c1d49720286 
1# "requested-slots" => (integer) 1001 
2# "completed-slots" => (integer) 800 
3# "failed-slots" => (integer) 50 
4# "importing-slots" => (integer) 0 
5# "canceled-slots" => (integer) 151 

High-Level Migration Flow

image

When a target primary receives a CLUSTER IMPORT command, it groups the slots for import by source primary node. It then connects to the source primaries and starts the migration using a new internal command, SYNCSLOTS. During migration, the target primary acts as a "slot-replica" of the source primary, conceptually. Upon receiving SYNCSLOTS, the source primary forks a child process to create a slot-specific AOF command sequence containing only the keys in the specified slots. This sequence is streamed to the target primary, which executes it on the main thread and replicates it normally to the target replicas. To ensure all changes to the migrating slots are captured, incremental updates (deltas) from the source shard are also streamed to the target primary, as a replica would handle live updates from its primary. Optimizations similar to dual-channel full sync could also be considered.

During atomic slot migration, the target primary only takes ownership of the slots after fully catching up with the source primary. This is done by freezing writes to the source primary for the migrating slots after the target receives the full slot snapshots. Minimizing the delta between the primaries before ownership transfer is important to reduce the unavailable window for writes (due to pausing). The target receives delta changes during the write pause. Upon achieving full synchronization, the target bumps up its config epoch without consensus. Note that the source primary’s pause of all writers is done with a timeout (potentially the same cluster node timeout). The source primary resumes writes upon detecting the ownership transfer (via the target's cluster message with a higher config epoch). If the target doesn't claim the slots before the timeout, the source unpauses writers and retains ownership. While this may cause data loss in some rare cases, it is acceptable for this design, and further solutions are explored in issue #1355.

If the source primary fails over before migration completes, the target primary can retry with exponential backoff or proceed to the next source. This implementation detail can be discussed further in the PR review.

The migration concludes when the source shard relinquishes ownership of the migrated slots, removes the associated data, and redirects client traffic to the target shard.

Neither the source nor target replicas are aware of the ongoing slot migration. Target nodes can either be new or serving client traffic. If a source or target replica fails during this process, it will simply perform a full synchronization from its respective primary as though no migration is underway.

Client traffic, including both read and write commands, to the slots in question continues to be directed to the source shard throughout the migration process, with no awareness of the migration.

Inter-Node Slot Import Protocol

The process of transferring slots from a source node to a target node involves an orchestrated sequence of steps to ensure data consistency and availability. This sequence begins with the target primary node sending a SYNCSLOTS command to the source primary, specifying the desired slot range. The source node responds with a unique session ID to identify this specific import operation.

Upon receiving the SYNCSLOTS command and acknowledging with the session ID, the source node forks a child process dedicated to streaming an AOF snapshot of the requested slots to the target. Concurrently, the source node begins tracking changes made to these slots only. This granular tracking mechanism allows the source node to maintain a record of all modifications to the migrating slots without having to pause writes to the entire keyspace, thereby preserving write availability for unaffected slots. This represents a trade-off: increased memory consumption in exchange for enhanced write availability.

Once the target primary has finished processing the AOF snapshot and has sufficiently caught up with the ongoing changes, it sends a TRANSFERSLOTS SESSIONID <session id> command to the source. This command signals the source to finalize the slot transfer.

In response to TRANSFERSLOTS, the source node pauses all further writes to the slots being migrated. This effectively freezes the tracked changes, ensuring that no new modifications are made to the data while the final transfer is in progress. The source then replies to the TRANSFERSLOTS command with the size of the tracked changes, indicating the amount of data the target node needs to receive to be fully synchronized.

The target node, upon receiving the size of the tracked changes, starts counting the number of bytes received from the source. As soon as the expected number of bytes has been received, the target node is certain that it has a complete and up-to-date copy of the migrating slots. At this point, the target node proceeds to claim ownership of the slots and bumps up its config epoch.

Immediately after claiming the slots and updating its epoch, the target node broadcasts this information to the entire cluster. This broadcast ensures that all other nodes are aware of the new slot ownership and can redirect client requests accordingly.

Major Design Considerations

CLUSTER IMPORT Execution Model (ASYNC)

The proposed CLUSTER IMPORT command uses an asynchronous approach for better client compatibility and resilience to network issues. This requires two additional commands for progress monitoring (CLUSTER IMPORT STATUS) and cancellation support (CLUSTER IMPORT CANCEL).

Epoch Bumping Strategies (Concensusless)

In Valkey, a node's configuration epoch determines slot ownership. When a slot is migrated from one node to another, the target node must increase its epoch to a value higher than any other node in the cluster. This ensures a clear and unambiguous handover of ownership, as the node with the highest epoch is considered the rightful owner of the slot.

There are two primary approaches to bumping a node's epoch:

  1. Consensusless Bumping: The target node directly increases its epoch without requiring agreement from other nodes. This method is efficient but carries the risk of epoch collisions if multiple nodes attempt to claim the same slot concurrently.

  2. Consensus-Based Bumping: The target node proposes an epoch increase and requires a majority of nodes in the cluster to approve this change before it can take effect. This approach reduces the risk of collisions but introduces complexity and potential delays.

For atomic slot migration in Valkey, it is argued that consensus-based epoch bumping is not necessary. This argument rests on the observation that consensus does not inherently eliminate the risk of data loss during slot migration.

Consider a scenario where slots are being migrated from node A to node B. Node B initiates the process, and node A pauses writes to the slots being transferred. After node B fully catches up with the data, it stages a vote to increase its epoch. However, due to network issues or other unforeseen circumstances, node A might not receive or process this vote request.

Despite this lack of acknowledgment from node A, node B might still secure enough votes from other nodes to bump its epoch. Subsequently, node B claims ownership of the slots. However, if node B's cluster messages fail to reach node A before its pause timeout expires, node A will resume accepting writes to the slots it no longer owns. This leads to an inconsistency where both nodes believe they own the slots, and writes directed to node A after the timeout will ultimately be lost when it eventually receives node B's updated epoch.

This scenario highlights the inherent trade-off between data consistency and availability. While a consensus-based approach might seem to offer stronger consistency guarantees, it cannot prevent data loss in situations where network partitions or node failures occur. Ultimately, the decision of whether to unpause writes on node A without explicit confirmation from node B rests on a delicate balance between ensuring data consistency and maintaining availability.

This complex issue of balancing availability and consistency, particularly in the context of write pauses and timeout mechanisms, is best addressed within the broader discussion of data durability. Therefore, further exploration of this topic and potential solutions are deferred to issue #1355, which focuses on enhancing data durability guarantees in Valkey.

Streaming Format (AOF)

When migrating data between nodes in Valkey, a fundamental design decision involves choosing the appropriate format for representing and transferring that data. Two primary candidates emerge: AOF, which logs Valkey commands, and RDB, a snapshot of the in-memory data. This section analyzes these options, considering the constraints of atomic data types and the implications for memory management on the receiving end.

Regardless of whether AOF, RDB, or a chunked variant of RDB is used, the receiver can only apply a change once the corresponding data is fully received. These changes can be primitive data types like strings and integers or composite data types like SETs, HASHes, etc. The primitives are indivisible and cannot be processed partially.

When a composite data structure is transferred, the receiver must buffer the entire structure in memory for processing. This remains the case even with chunked data unless the chunks are aligned with the atomic data type boundaries within the structure (akin to AOF).

Consequently, streaming the atomic data types (strings and integers) emerges as the most memory-efficient and straightforward approach. This strategy minimizes the buffering and tracking requirements on the receiver, as each atomic unit can be processed and discarded immediately upon reception.

This approach aligns with the AOF format, which essentially represents data as a sequence of Valkey commands operating on these atomic data types. Using AOF for data transfer offers several advantages:

  • Existing Support: Valkey already has robust mechanisms for handling AOF files, simplifying implementation.
  • Efficient Processing: Multiplexing the loading of AOF commands on the main thread has minimal impact on overall performance.
  • Simplified Replication: Replicating AOF commands to replicas requires little to no additional work.

In contrast, RDB, even when streamed in chunks, presents challenges. Unless the chunks align perfectly with the atomic data type boundaries, the receiver still needs to buffer potentially large segments of data, increasing memory pressure. This makes RDB or chunked RDB less suitable for efficient data transfer in Valkey.

Therefore, based on the constraints of atomic data types and the need to minimize memory pressure on the receiver, AOF emerges as the preferred solution for data transfer in Valkey. Its inherent alignment with atomic units, combined with existing support and efficient processing capabilities, makes it a more suitable choice compared to RDB or its chunked variants.

Observability for Atomic Slot Migration

To enhance the observability of the atomic slot migration process and provide operators with improved visibility into its progress and potential issues, we can integrate detailed metrics as follows into CLUSTER INFO:

Metrics on the Source Primary

  • Track the total number of migration requests received.
  • Monitor the current number of slots actively being migrated to a target node.
  • Count the migration operations that failed when the source acted as the sender.
  • Track specific error types encountered during migration (e.g., fork failures, network issues).

Metrics on the Target Primary

  • Track the total number of migration requests initiated.
  • Monitor the current number of slots being imported.
  • Count the migration operations that failed when the target primary acted as the requester.
  • Track specific errors encountered during the import process.
@PingXie
Copy link
Member Author

PingXie commented Mar 25, 2024

@madolson and I discussed another option (in the context of atomic slot migration) which essentially implements slot-level replication. Here is the high level flow

  1. source (parent) forks a child with a set of slots to migrate
  2. child streams slots to be migrated in the RDB format (same as replication)
  3. target needs to support no-blocking load of the RDB stream (think of co-routines)
  4. source (parent) starts capturing updates in the migrating slots right after forks
  5. child completes streaming
  6. source (parent) pauses clients writing to these slots
  7. source replicates captured updates to target
  8. source starts the slot ownership transfer process (depending on cluster v1 vs v2, we could take different paths)
  9. source unblocks paused clients with -MOVED <target>

any failure on the target before completing step 8 would abort the migrating process on the source.

@zuiderkwast
Copy link
Contributor

I have an implementation. Didn't write it down yet, but I recorded a song which explains it all. Please review. https://suno.com/song/b06c2a5b-3760-4916-9f56-eb3fe66f24e2

@PingXie PingXie changed the title Atomic slot migration HLD [NEW] Atomic slot migration HLD Apr 15, 2024
@PingXie
Copy link
Member Author

PingXie commented Apr 15, 2024

I have an implementation. Didn't write it down yet, but I recorded a song which explains it all. Please review. https://suno.com/song/b06c2a5b-3760-4916-9f56-eb3fe66f24e2

Are you referring to #298? I will take a look next but I don't see it solving this issue?

@zuiderkwast
Copy link
Contributor

zuiderkwast commented Apr 15, 2024

@PingXie The song is about atomic slot migration. #298 is about subscribing to slot migrations. Not the same.

@PingXie
Copy link
Member Author

PingXie commented Apr 15, 2024

@PingXie The song is about atomic slot migration.

Ship-it

@PingXie
Copy link
Member Author

PingXie commented Dec 15, 2024

This atomic slot migration proposal incorporates valuable input from offline discussions with @enjoy-binbin and @murphyjacob4. It builds on the shared foundation of using RDB format for slot data transfer via a child process and the ownership transfer flow, while also addressing considerations discussed during the process, such as support for targets serving traffic, epoch bumps, and the trade-off between availability and durability.

@valkey-io/core-team PTAL

@zuiderkwast
Copy link
Contributor

zuiderkwast commented Dec 16, 2024

This sounds perfect! Good job!

Questions follow.

CLUSTER MIGRATE <source_shard_id> <slot_start> <slot_end> [<slot_start> <slot_end>...]

  1. Why does the command need the shard-id at all? The target knows who owns each slot. (If we skip this parameter, the command can be made to fail with a CROSSSHARD error.)
  2. Can the shard-id be specified on multiple forms like host:port, node-id or human-node-id?
  3. Can we make the syntax extensible to allow future additions such as an async option? Or is it already extensible?

This proposal prioritizes availability over write durability by maintaining the same ownership finalization order as the current slot migration protocol.

Availability is already compromised since we pause writes on the source while changing the ownership, isn't?

I don't fully understand this then. Can we pause writes on the target until the epoch has been bumped? Or we can't because we don't know when a collission will be detected by other nodes?

Server configs like loading_process_events_interval_bytes can be adjusted to improve responsiveness during periods of heavy activity.

Alternatively, async-loading-in-background-thread could be used to offload RDB loading to a separate thread.

This formulation makes it look as if these are already existing configs. Are they proposed or just hypotherical ones? I guess this could just be made more clear.

@hwware
Copy link
Member

hwware commented Dec 16, 2024

I have an implementation. Didn't write it down yet, but I recorded a song which explains it all. Please review. https://suno.com/song/b06c2a5b-3760-4916-9f56-eb3fe66f24e2

amazing song

@hwware
Copy link
Member

hwware commented Dec 16, 2024

I have another cluster migrate command syntax proposal:

@PingXie proposed syntax is that the slot scale includes start and end, but there is another case: client just would like to migrate some individual slots, thus can we make the syntax as below:

CLUSTER MIGRATE <source_shard_id> SLOTS <slot_start> <slot_end> [<slot_start> <slot_end>...], --- after SLOTS, it must be even number of the parameters
CLUSTER MIGRATE <source_shard_id> SLOT slot1 slot3 slot5 ---- after keyword SLOT, it could be odd or even parameter number

Another thing I am concerning is reply message: Because some slot migration process could fail due to some reason, and there
are multiply slots in the command syntax, I suggest one of possible reply an array including every slot mentioned in the cluster migrate command success or failure, maybe like this:

slot 1: ok
slot 2: fail
...........

@madolson
Copy link
Member

I have a couple of reservations about the current proposal.

This command is blocking, meaning it will not return until the migration completes or encounters an error.

Many clients have built in timeouts that might unexpectedly error. Are we sure we want to make this blocking by default? Very few options block like this, maybe at least have an option to make it not-block. I see there is some discussion a bit later about this.

This process begins with the operator issuing a single command on the target primary:

Why connect to the target over connecting to the source? Seems counter intuitive to me, and doesn't match how the existing migrate command works. The primary is also the owner of the slot, and would logically know now if it can do the migration. It also feels like the source is driving everything, so it makes more sense to send the command to it.

This file is streamed to the target primary, which loads it asynchronously to avoid blocking existing operations. Incremental updates (deltas) from the source shard are also streamed to the target primary, ensuring that all changes to the migrating slots are captured during the migration process, just as a replica would handle live updates from its primary.

Are we streaming the updates like dual channel?

Neither the source nor target replicas are aware of the ongoing slot migration. Target nodes can either be new or serving client traffic. If a source or target replica fails during this process, it will simply perform a full synchronization from its respective primary as though no migration is underway.

How is the replica receiving the data from the slot migration? I assumed that the target replica would be forwarded the data from the target primary, but not sure how that would get interleaved into the replication stream.

Slot Ownership Transfer

Are we transferring the slots as a batch or individually? Doing it as a batch increases the overall time, since you have to accumulate more incremental replication data, but makes it more atomic.

@PingXie
Copy link
Member Author

PingXie commented Dec 23, 2024

Synchronous vs. Asynchronous Approach for CLUSTER MIGRATE

I see the following variants on this topic:

  1. Synchronous but primitive: Provide a command that moves one slot at a time.
  2. Synchronous with batching support: Allow the caller to migrate multiple slot ranges.
  3. Asynchronous but primitive: Provide a command to submit/queue one slot at a time.
  4. Asynchronous with batching support: Allow the caller to submit/queue multiple slot ranges (without getting blocked).

I don't think we should go with #​1 or #​3 because forking is expensive. These options eliminate the possibility of the engine ever forking once for multiple slot migrations.

#​2 is what’s proposed in this issue, while #​4 is described/discussed in #587.

Another thing I am concerning is reply message: Because some slot migration process could fail due to some reason, and there are multiply slots in the command syntax,

I agree. Error handling and progress reporting are critical parts of this design. If we adopt synchronous semantics, we’ll likely need a pub/sub channel or a MONITOR PUSH like mechanism to keep the client informed, which wouldn’t be necessary with an asynchronous model (see #587)

Why does the command need the shard-id at all? The target knows who owns each slot. (If we skip this parameter, the command can be made to fail with a CROSSSHARD error.)

The shard-id is not strictly required, and I’m fine with removing it as doing so would simplify the user flow too.

Can the shard-id be specified on multiple forms like host:port, node-id, or human-node-id?

This question becomes moot if we remove the shard-id parameter.

Can we make the syntax extensible to allow future additions such as an async option? Or is it already extensible?

From a command shape perspective, we can easily add a SYNC or ASYNC argument later. However, as mentioned earlier, the sync vs. async decision significantly impacts error and progress reporting, especially if we plan to support batching.

Can we make the syntax as below:

CLUSTER MIGRATE <source_shard_id> SLOTS <slot_start> <slot_end> [<slot_start> <slot_end>...]
CLUSTER MIGRATE <source_shard_id> SLOT slot1 slot3 slot5

It’s doable. Supporting both slot ranges and single slots adds some complexity, but this is limited to parsing logic, so it’s not a major concern for me.

Many clients have built-in timeouts that might unexpectedly error. Are we sure we want to make this blocking by default? Very few options block like this; maybe at least have an option to make it non-blocking. I see there is some discussion a bit later about this.

That’s an excellent and very convincing point. I’m now leaning toward the asynchronous approach.

Why connect to the target over connecting to the source? Seems counterintuitive to me, and doesn't match how the existing migrate command works.

The core idea here revolves around "slot-level" replication, so a better analogy would be "cluster replicate." I don’t think we need or want to emulate the existing migrate command.


Major Decisions on This Topic:

  1. Single slot or batching?
  2. Async vs. sync?
  3. Error/progress reporting mechanism?
  4. Recipient being the target vs. the source?

@hwware
Copy link
Member

hwware commented Dec 24, 2024

  1. Single slot or batching?
  2. Async vs. sync?

Ideally, the clients wish there is no blocking. But for batching a range of slots, it must have blocking time for sync.
Can we implement this as 2 steps: first implement sync, then async? After all, the FLUSHALL command support SYNC and ASYNC
way.

  1. Error/progress reporting mechanism?

As what you said, let us first decide it is async or sync model.

  1. Recipient being the target vs. the source?

    I have no strong opinion for the caller side.
    But considering an enhancement feature later: automatic slot migration mechanism: When the data in the source shard reach memory threshold, slot migration could be triggered automatically, it will select the shard which includes less data as target. From this point, I vote the recipient is target.

@zuiderkwast
Copy link
Contributor

zuiderkwast commented Dec 24, 2024

I have no strong opinion on sync vs async, but I guess an interactive tool may want to display a progress indicator of some sort. As Wen said, we can start with sync and add async later as long as the syntax is extensible.

4. Recipient being the target vs. the source?

I have no strong opinions on this either. Only some thoughts.

  • If the command is sent to the target AKA the importing node, maybe the command should be called CLUSTER IMPORT? The term "migrating node" historically means the source node.

  • May we ever want to fork once to migrate slots to multiple destinations? For this, sending to the source node with syntax like this could be useful?

    CLUSTER MIGRATE SLOTRANGE 0 100 TO shard1 SLOTRANGE 101 200 TO shard2 ...
    

@murphyjacob4
Copy link
Contributor

murphyjacob4 commented Dec 26, 2024

  1. Recipient being the target vs. the source?

I think the main idea here was to mimic replication flow. In replication, we indicate a node to be a replica of another, we don't indicate that a primary replicate to a certain replica.

It also feels like the source is driving everything

I think when you dig into the details, the target is actually the one driving most things:

  1. The target connects to the primary and initiates replication via PSYNC
  2. The target communicates to the primary to pause writes
  3. The target waits for replication offset to catch up to the pause time
  4. The target initiates the vote and collects votes to bump the epoch

It's possible to implement it in the opposite direction, but it breaks with how the replication (1) and manual failover (2,3,4) primitives are implemented.

@murphyjacob4
Copy link
Contributor

Are we streaming the updates like dual channel?

The entire slot-level replication flow will ideally use the same logic as the node-level replication flow. So all improvements that we have made to node-level replication should be reusable, including dual channel.

The way I have previously prototyped it is using a new REPLCONF slot-id to specify a certain slot for replication, but otherwise from the source's point of view it proceeds just like any other replication, except we only serialize that one slot during the full sync. But those details probably go beyond the "high level design".

How is the replica receiving the data from the slot migration? I assumed that the target replica would be forwarded the data from the target primary, but not sure how that would get interleaved into the replication stream.

Just like with chaining replication, we can proxy the data from the source primary to the target replica through the target primary using replicationFeedStreamFromPrimaryStream. The only additional wrinkle is that the source primary stream to the target replica may be interwoven with writes for other slots on the target primary, but these should be non-conflicting (since these will be for different slots).

@zuiderkwast
Copy link
Contributor

Oh, you're using REPLCONF and PSYNC for the inter-node communication. That's an important detail that wasn't mentioned until now. :)

It makes sense that the target is driving it.

4. The target initiates the vote and collects votes to bump the epoch

This is new? It isn't done now, because it's assumed to have too much overhead, so the target bumps the epoch without consensus iirc.

Just like with chaining replication, we can proxy the data from the source primary to the target replica through the target primary

We proxy a slot RDB stream interleaved with regular write commands for other slots in one replication stream? This sounds like a new concept to me.

@madolson
Copy link
Member

The only additional wrinkle is that the source primary stream to the target replica may be interwoven with writes for other slots on the target primary, but these should be non-conflicting (since these will be for different slots).

Yeah, this is OK, but the RDB stream is not based off of commands, so we need a way to properly interleave them. We've talked about a command at AWS like RDBCHUNK <slot id>, that can be sent from the source to the target so everything gets processed normally and then replicated. One thing we need to handle gracefully is PSYNCs and storage into AOF/RDB, so converting the RDB stream into something like a command would be beneficial.

I think the main idea here was to mimic replication flow. In replication, we indicate a node to be a replica of another, we don't indicate that a primary replicate to a certain replica.

Yeah, I guess I don't really have much of a strong preference either way since both sides need a state machine for tracking the progress.

@madolson
Copy link
Member

This is new? It isn't done now, because it's assumed to have too much overhead, so the target bumps the epoch without consensus iirc.

You are not required to get consensus to bump the epoch when quorum isn't changing if you are just trying to prevent divergent clusters (also called a split brain), although the election would effectively mitigate the risk of race against a pending failover election in the target shard (and I guess the source shard depending on how the failover is constructed).

I guess my question is what are we voting on. If it's based on the epoch, we would effectively be serializing the slot migrations through the cluster for each epoch, which I don't think is a great idea.

@PingXie
Copy link
Member Author

PingXie commented Jan 6, 2025

Replication Scheme

The only additional wrinkle is that the source primary stream to the target replica may be interwoven with writes for other slots on the target primary, but these should be non-conflicting (since these will be for different slots).

Yeah, this is OK, but the RDB stream is not based off of commands, so we need a way to properly interleave them. We've talked about a command at AWS like RDBCHUNK , that can be sent from the source to the target so everything gets processed normally and then replicated. One thing we need to handle gracefully is PSYNCs and storage into AOF/RDB, so converting the RDB stream into something like a command would be beneficial.

That is a great point. I was thinking of converting the key/value pair back to commands on the target primary before forwarding them to the target replicas but I feel that is complicated. RDBCHUNK sounds like a better idea. Some follow up questions

  1. what are the data arguments of RDBCHUNK? are they actually RDB chunks?
  2. Do they need to participate AOF/RDB? I guess whatever hasn't been committed to the db will be stored under a new AUX field but the ones that have been fully decoded will be automatically included?
  3. Will we keep them in the replication backlog? I'd think so?

Are we streaming the updates like dual channel?

We could/should though I consider dual channel a "local" optimization, as opposed to the core design. Also, the "replication chaining" question above would have an impact on this topic too.

Source-driving vs Target-driving

When we (@enjoy-binbin @murphyjacob4) discussed offline, we had the mental model of "slot level replication", which lends naturally to "target-driving". But now I can see "source-driving" is more natural for "replication chaining".

Main-thread Loading vs Background-thread Loading

The discussion here is also tied to the "replication chaining" one. If we go with "source-driving", I think this becomes a moot question because all commands run on the main thread. Nevertheless, the following discussion still assumes "target-driving" and "slot level replication".

Server configs like loading_process_events_interval_bytes can be adjusted to improve responsiveness during periods of heavy activity.
Alternatively, async-loading-in-background-thread could be used to offload RDB loading to a separate thread.

This formulation makes it look as if these are already existing configs. Are they proposed or just hypotherical ones? I guess this could just be made more clear.

loading_process_events_interval_bytes is an existing config, which allows non-blocking loading on the main thread. Obviously, there is impact on the primary workload. My thinking is to pick a smaller value for loading_process_events_interval_bytes to reduce the impact. I think we can make it perform better than the current MIGRATE-based slot migration protocol.

The other option is to load the RDB using a background thread. The downside is that it will might break some modules.

Yes another option would be a hybrid approach, i.e., we implement both modes with the background loading as the default. If an incompatible module is loaded, we fall back to the main-thread loading.

I feel that the main-thread loading is easier to implement, could very likely offer better perf than the current protocol, and is more compatible.

@PingXie
Copy link
Member Author

PingXie commented Jan 6, 2025

Slot Ownership Transfer

Are we transferring the slots as a batch or individually? Doing it as a batch increases the overall time, since you have to accumulate more incremental replication data, but makes it more atomic.

I am in favor of batch transfer

I guess my question is what are we voting on. If it's based on the epoch, we would effectively be serializing the slot migrations through the cluster for each epoch, which I don't think is a great idea.

I don’t see a need to vote, which is synonymous with a consensus epoch bump in the Valkey context. A consensus epoch bump resolves epoch collisions, and that’s about it. In my view, epoch collisions are less of a concern with ASM. Moreover, a consensus epoch bump doesn’t guarantee atomic slot ownership transfer—there’s still a chance that the source node might cling to the old slot even if the rest of the cluster votes for the target. When this happens, writes could temporarily go to/get accepted by the source but would eventually be dropped by the source once the source learns the updated truth.

@madolson
Copy link
Member

madolson commented Jan 7, 2025

what are the data arguments of RDBCHUNK? are they actually RDB chunks?

I think there are two logical options:

  1. RDBCHUNK is basically a chunk of a slot level RDB file. So RDBCHUNK <slot id> <payload> along with the needed start and end. It could also be something like, RDBCHUNK START <slot id> + RDBCHUNK VALUE <chunk> + RDBCHUNK END <checksumming?>
  2. Per key. This is basically just the restore command we use today and fits more logically into the command flow we already use. Ideally we would still have a way to chunk up the RDB on an individual key.

The main benefit of 1 is efficiency when there are a lot of small keys to avoid the overhead of the per key RESTORE commands.

Do they need to participate AOF/RDB? I guess whatever hasn't been committed to the db will be stored under a new AUX field but the ones that have been fully decoded will be automatically included?

We need to participate in AOF presumably. AOF with slot migration works today, although I don't know how well it plays with fsync everywrite. For RDBs, if a replica connects while a slot migration is in progress do we include that in the full-sync? We need the replica to eventually be fully up to date.

Will we keep them in the replication backlog? I'd think so?

Yeah, if a replica disconnects we would want to allow a psync without having to do a full-sync if possible.

I don’t see a need to vote

I'm currently leaning this way too.

@PingXie
Copy link
Member Author

PingXie commented Jan 8, 2025

Per key. This is basically just the restore command we use today and fits more logically into the command flow we already use. Ideally we would still have a way to chunk up the RDB on an individual key.

Large keys would still be a challenge I think, if we chunk the data along the key boundary.

What do you think about leveraging AOF instead of RDB? The source would still fork, but it would stream the data as commands (HSET, SADD, etc). These commands would be processed on the target and then relayed to the replicas. This eliminates the transformation step I was considering on the target after loading the RDB. We wouldn't need a new command (RDBCHUNK) or have to send the entire key at once, avoiding the large key issue.

@soloestoy curious to hear your thoughts - this is essentially the same idea as your proposal #59 but at the slot level.

@murphyjacob4
Copy link
Contributor

Currently - I see three outstanding high level design questions:

  1. What are the semantics for transferring slot data from source to target?
  2. Who will logically own the migration (source vs target)
  3. Do we want to introduce an election, or are we okay with just bumping epoch

Feel free to chime in if there are any other open questions I am missing that we need addressed.

Semantics for transferring slot data

Just like for replication-related syncs, it breaks down into a snapshot and a backlog of changes. @PingXie and I have discussed offline about using AOF format (i.e. a command stream) for the snapshot. As mentioned by Ping, AOF has a benefit that we can multiplex the snapshot command stream with client commands on the target's replication link.

This would also be achievable by using RDBCHUNK, and AOF is more inefficient as a format than RDB (more expensive to replay, larger serialized payloads). However, implementing RDBCHUNK seems non-trivial - we need to have dedicated buffers to store the RDB contents for each importing slot, append new chunks to those buffers as they come in, and track state machines for each ongoing RDB load designating where we are (I.e. RDBCHUNK 1 contains half of a list, and RDBCHUNK 2 contains the other half - we need to save that we were in the middle of loading the list after processing RDBCHUNK 1 and resume it with RDBCHUNK 2). Without the ability to load a key in multiple chunks in multiple commands, we have the same big-key issues as present day.

I think using AOF rewrite to serialize the snapshot will give us a majority of the benefit at a lower cost. The implementation of RDBCHUNK can be something we track as an improvement, and that project will come with other benefits (e.g. the ability for users to import an RDB snapshot on top of existing data on a running instance). So my vote would be for "AOF snapshot format" for an initial implementation

Source vs target driven

I see a few drawbacks to being source driven:

  1. Dual channel will be difficult to implement with a push model, requiring the source to initiate an additional connection and running a special command (something like QUEUEBEHINDRDB?) to inform the target node to buffer commands on that connection and replay them after the ongoing RDB load. In contrast, with a pull model, we can reuse the same exact flow as replication, with the only difference being that we restrict the replication stream to just a single slot (we can use REPLCONF for this).
  2. The target node will still need to be placed in an "importing" state to accept the writes for the slot. We will also need to send ASKING on each command we stream to the node (or have some way to exclude the slot migration link from target node slot ownership checks). Attached to the importing state, we will need to set a TTL on the slot contents so we don't hold them forever if there is a failure in the migration. The fact that all this state is owned by the target - but the source is driving the operation - feels like an anti-pattern solved by having the target drive it.
  3. When it becomes time to bump the epoch, it will need to happen on the target anyways, so this is additional coordination that would need to be signaled from target to source (either via cluster bus or through a new command). If source owns the bump, there would be an unavailability period where neither source nor target claims ownership, until the target finds out about the bump (not to mention if there is a collision, then the source might have to go back to owning it).

Overall, it feels like the majority of the state and state transitions will need to be owned by the target node, and there is an additional syntactical advantage that when communicating with the target you do not need to provide the origin node - just the slot - and it can use topology to determine ownership. So my vote would be for "target-driven".

Election vs bump

I guess my question is what are we voting on. If it's based on the epoch, we would effectively be serializing the slot migrations through the cluster for each epoch, which I don't think is a great idea.

I think the main reason would be to prevent a lost write scenario when there is a failover on either source or target (whichever we choose not to own the election). If target was importing from source, but then we failover to source', writes to source' could be lost when the migration completes. If you force an election, it would logically serialize the two events, so in the worst case target would fail to get elected, after which it would find out source' is now the owner and cancel the slot migration.

All that said, I think this is not the only data loss condition in the current protocol. Even with an election, we could have situations where the source unpauses itself after its pause timeout is reached, but the target election eventually succeeds, causing lost writes.

My vote would be on bumping the epoch instead of going through election, and we should defer the data loss issues with slot migration as a different project (cluster v2 maybe?).

@madolson
Copy link
Member

madolson commented Jan 9, 2025

What do you think about leveraging AOF instead of RDB? The source would still fork, but it would stream the data as commands (HSET, SADD, etc). These commands would be processed on the target and then relayed to the replicas. This eliminates the transformation step I was considering on the target after loading the RDB. We wouldn't need a new command (RDBCHUNK) or have to send the entire key at once, avoiding the large key issue.

We lose a bit more efficiency, at least my understanding is that RDB load is faster than command execution, but we get some compatibility benefits between versions which is a nice benefit. I imagine sending it as a RESTORE command will be strictly faster and achieves the same outcome, so I'm not sure the benefit of the AOF format.

@murphyjacob4
Copy link
Contributor

I imagine sending it as a RESTORE command will be strictly faster and achieves the same outcome, so I'm not sure the benefit of the AOF format.

RESTORE requires that we serialize an entire key. On both source and target, we need additional memory overhead to store the dump of the key in memory (for serialization and deserialization, respectively). In AOF, rewrite will be able to break a big hash into many HMSET calls with AOF_REWRITE_ITEMS_PER_CMD elements each, but RESTORE means the smallest chunk we can send at a time is the entire hash.

This is one of the big downsides of current slot migration implementation and oftentimes means that we can't move slots that have large keys if we don't have an equivalent amount of free memory on the node. Some data structures are O(GB) in many workloads.

To be able to chunk the RESTORE command on a key level - I think we will need the RDBCHUNK functionality - which I see as an incremental improvement. For atomic slot migration - we should use REPLCONF to negotiate the data format, so that we can later change how the data transfer happens. I also generally see an AOF formatted full sync as being a useful primitive for other things (as you mentioned, AOF could be loaded in a forwards compatible manner). I also do see the benefit of RDBCHUNK, but as I mentioned above, I'm just not sure on the return on investment over doing something like AOF for this feature.

@madolson
Copy link
Member

This is one of the big downsides of current slot migration implementation and oftentimes means that we can't move slots that have large keys if we don't have an equivalent amount of free memory on the node. Some data structures are O(GB) in many workloads.

We aren't solving this problem right? Whether it's AOF or RESTORE, both cases end up running out of memory on the target.

as you mentioned, AOF could be loaded in a forwards compatible manner

AOF isn't forward compatible. If you introduce a new functionality, it doesn't know how to load it, HFE for example. I feel like we make this claim periodically, but AOF and RDB are the same in terms of being backwards compatible. AOF just chooses to standardize on a single high left format for compatibility at the cost of efficiency. I guess I generally would bias towards performance during the majority case instead of some theoretical forward compatibility story.

which I see as an incremental improvement

I'm OK with an incremental improvement. Although in this specific case, I'm not sure it's enough work to split between two minor versions since we need to implement a handshake. I think we either commit to AOF or to RESTORE with a chunk.

@madolson
Copy link
Member

madolson commented Jan 10, 2025

@PingXie Can you update the comment with the updates to the proposal so that it's easier for someone to come in and review? This thread is a bit long, but I think we're close to having consensus so worth to make sure it's easy to send this to some other folks.

@PingXie
Copy link
Member Author

PingXie commented Jan 10, 2025

@PingXie Can you update the comment with the updates to the proposal so that it's easier for someone to come in and review? This thread is a bit long, but I think we're close to having consensus so worth to make sure it's easy to send this to some other folks.

Did you bug my office? 😂 I've been working on updating the proposal today on and off. Hoping to wrap it up by the weekend🤞

@murphyjacob4
Copy link
Contributor

We aren't solving this problem right? Whether it's AOF or RESTORE, both cases end up running out of memory on the target.

I guess I wasn't clear in my explanation. You do need the target to be able to fit the new data, yes, however there is an overhead to non-chunked RESTORE where, in addition to the memory to fit a key, you need an equivalent amount of free memory to store the key during serializing and de-serializating the DUMP.

To give it an example, suppose I am migrating a 5 GiB hash on a node currently at 10 GiB of usage. In order to DUMP this - I will need to write it to a buffer of size ~5 GiB. But we have not deleted the key from the dictionary yet. So you will need a total of 15 GiB of memory (10 GiB existing usage and 5 GiB dump buffer) to store the interim dumped copy. The inverse happens on the target node. We need to load the full command into the query buffer before we can begin processing it. Then there will be a similar period where we need 15 GiB of memory during deserialization before clearing the dump from the query buffer. I don't think there is a simple way to improve this with a "streamed" approach either (outside of the fork+loading primitive we have from full sync, but there is the target's replica problem).

The idea being that with AOF format - the size of that interim overhead shrinks drastically from the size of the key (5 GiB in this example) to the size of a chunk of elements in that key (and adjusting the number of elements to try to maintain a max in-memory size shouldn't be difficult).

AOF isn't forward compatible

I don't disagree with you. Instead of saying "forward compatible", let's say that AOF allows for lossy downgrade, which is something I don't think we can do with RDB since I don't think we have a way to skip over unknown types effectively. Point being - it could be useful outside of atomic slot migration for those who are okay with data loss. If we want perfect forward compatibility - we should talk about a protobuf-based RDB v2 😉

I'm not sure it's enough work to split between two minor versions since we need to implement a handshake

We can talk about how chunked RESTORE design would look. What I have in my head is similar to what you mentioned with RDBCHUNK:

  1. RDBLOAD TIMEOUT <timeout> -----> +<id>
  2. RDBLOAD CHUNK ID <id> PAYLOAD <payload> -----> +OK/-ERR Not found
  3. RDBLOAD END <uuid> -----> +OK/-ERR Not found

Really - it is just a mechanism for a push-based RDB transfer mechanism - it wouldn't necessarily be tied to just slot migration and could be used for import/export use cases as well (i.e. I want to take data from a backup and load it on a live node).

Calling RDBLOAD START will establish a tracking state on the importing node, mapping the assigned ID to a tracking struct like:

struct rdbPushLoadState {
    size_t id;
    char *buff;
    size_t buf_size;
    robj * curr;
    int curr_rdb_type;
    size_t curr_remaining_elements;
    mstime_t deadline;
}

When we get a RDBLOAD CHUNK, we can map it to this struct and:

  1. Add the payload to buff
  2. If curr is set and curr_remaining_elements > 0, then we would attempt to read the elements out and insert into the object based on the encoding type (list, hash, etc)
  3. If curr is not set or curr_remaining_elements == 0, then we would attempt to read a new robj

If either 2 or 3 fail due to not enough data in buff, we would return and wait for the next RDBLOAD CHUNK. We would need to use the deadline to prevent leaking tracking structs when a client that is importing an RDB disappears. Note that this wouldn't necessarily clean up the associated keys that were loaded, but just leave the RDB partially loaded. In the context of ASM, we could provide another parameter RDBLOAD START TIMEOUT <timeout> SLOT <slot-num> to signal that this is slot migration and that a timeout means to drop the slot entirely.

So this would effectively be a refactor of

valkey/src/rdb.c

Line 1859 in 11cb8ee

robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
to support the ability to absorb chunks of the buffer that are smaller than just a single key.

Looking at the state machine and protocol - I am seeing a decent amount of complexity, when we can get AOF for "free" today. But maybe I have overcomplicated it?

@PingXie
Copy link
Member Author

PingXie commented Jan 13, 2025

@PingXie Can you update the comment with the updates to the proposal so that it's easier for someone to come in and review? This thread is a bit long, but I think we're close to having consensus so worth to make sure it's easy to send this to some other folks.

Updated. I will close #587 next now that this proposal is going with ASYNC behavior too.

Although in this specific case, I'm not sure it's enough work to split between two minor versions since we need to implement a handshake. I think we either commit to AOF or to RESTORE with a chunk.

I see the fundamental constraint here being on the receiving end. Regardless of chunking or the encoding format, the receiver must ultimately apply changes as complete data types. This means that if we transfer composite data structures as-is, the receiver needs to buffer/track them, which leads to significant memory pressure for large keys.

AOF addresses this by allowing us to send the data in its smallest components – the primitive data types. These are generally small in practice, which minimizes the memory burden on the receiver. By breaking down composite structures into their constituent primitives, we effectively mitigate the memory issues associated with large keys.

This approach aligns with the observation that large keys often result from users employing composite data types that grow over time. By decomposing these structures into their atomic elements for transfer, we address the root cause of the problem.

@PingXie PingXie added the major-decision-pending Major decision pending by TSC team label Jan 13, 2025
@madolson
Copy link
Member

Looking at the state machine and protocol - I am seeing a decent amount of complexity, when we can get AOF for "free" today. But maybe I have overcomplicated it?

The code we have internally is less than 200 lines, so I guess I'm not really seeing the complexity. There are pretty straight forward ways to avoid the large object buffering problem by partially serializing the key as you process the chunks of memory. The RDB format is more memory efficient than the in-memory representation.

If we want perfect forward compatibility - we should talk about a protobuf-based RDB v2 😉

This is still not forward compatible 😛. As soon as we introduce hash field expiration it breaks. We can't be forward compatible unless we pre-define all future changes we are going to make.

Looking at the state machine and protocol - I am seeing a decent amount of complexity, when we can get AOF for "free" today. But maybe I have overcomplicated it?

I guess my question boils down to is this a pathway worth complicating for performance. I agree we can get AOF for a lower implementation cost, but we can be more efficient and more dense across the network.

@zuiderkwast
Copy link
Contributor

If we want to interleave a dump in a stream of commands, then AOF seems trivial while anything else is non-trivial, so I'd prefer to start with that.

We have REPLCONF VERSION, so it's easy to extend this later.

I'd like to scope cut this feature because i want to see it implemented and merged in a foreseeable future.

@madolson
Copy link
Member

If we want to interleave a dump in a stream of commands, then AOF seems trivial while anything else is non-trivial, so I'd prefer to start with that.

I suppose FWIW I think starting with AOF and then evaluating changing it makes sense. There is a lot of code we need to write, I don't think cutting this corner makes sense but I don't think it materially changes the overall design.

@zuiderkwast
Copy link
Contributor

So lets get to writing/reviewing/merging code. Can it be broken down in smaller tasks in some way? Is there a WIP implementation anywhere?

@hwware
Copy link
Member

hwware commented Jan 14, 2025

Thanks for @PingXie Hard work to update the top comment, however I have 2 questions about the design, and if you can clarify my concern:

  1. For command CLUSTER IMPORT SLOTS, in description part, it is mentioned that Existing target node slots and slots with undetermined sources are skipped. But in the command SYNCSLOTS part, the error message said ERR Slot range <slot_start>-<slot_end> is already being imported.
    My understanding is that CLUSTER IMPORT SLOTS is called before SYNCSLOTS command, how the error happens in SYNCSLOTS called?

  2. For the command CLUSTER IMPORT CANCEL, if there is ongoing migrated slots, how to proceed the data in the target primary node and source primary node? Could you please provide workflow for it?

@murphyjacob4
Copy link
Contributor

Is there a WIP implementation anywhere?

@zuiderkwast I have a WIP implementation here: unstable...murphyjacob4:valkey:atomic-slot-migration

It is somewhat out of sync with the design. Going to spend the rest of today resolving merge conflicts with unstable and incorporating the design changes, then I can post a draft PR. My optimistic ETA would be Thursday. I can also chunk it into multiple PRs if that would be easier to review

@zuiderkwast
Copy link
Contributor

@murphyjacob4 this is great news! I don't know if it needs to be split into multiple PRs. How many lines of code is it?

@PingXie and others, regarding these commands:

CLUSTER IMPORT STATUS OPID id
CLUSTER IMPORT CANCEL OPID id

They will appear under the CLUSTER IMPORT subcommand. We only have one level of subcommands. STATUS and CANCEL will be just an argument and OPID will be kind of a redundant token. I suggest we skip the OPID token and just use CLUSTER IMPORT STATUS id, etc.

Regarding the internal commands:

SYNCSLOTS SLOTS start1 end1 [start2 end2 ...]
TRANSFERSLOTS SESSIONID id
  • We also need a way to cancel an ongoing SYNCSLOTS session when CLUSTER IMPORT CANCEL is called or in case of some other errors like failovers, right? (This was raised by Wen above.)

  • I'm wondering if we need two-three separate commands or if it can be different variants of one command, such as

    SYNCSLOTS SLOTS start1 end1 [start2 end2 ...]
    SYNCSLOTS TRANSFER id
    SYNCSLOTS CANCEL id
    

When a slot migration is cancelled, should the target send CLUSTER FLUSHSLOT to its replicas to clean up?

@enjoy-binbin
Copy link
Member

enjoy-binbin commented Jan 20, 2025

I re-read the top comment and discussion, sorry for jumping in so late. I've always had anxiety about long sentences and have been busy with other things lately. I will first talk about the top comment, and then try to explain my previous code, https://github.com/enjoy-binbin/valkey/commits/slot_migrations/. I discussed this solution with Ping offline a long time ago, and I also tried to write the code before, although it is not perfect yet, but it is something.

About the top comment

i see CLUSTER IMPORT SLOTS will return a op_id, does this mean we are supporting calling the command again later and importing a different slot range into a different node? We send this command to the target node, right?

CLUSTER IMPORT STATUS, why we have these information? The success slots, fail slots, canceled slots? Don't these succeed or fail together? So we are worried that after the target node imports slot1, there will not be enough memory to continue importing slot2? We should advise users to calc the memory in advance, and then succeed or fail at one time. There should not be partial success, partial failure, or partial cancellation.

CLUSTER IMPORT CANCEL i dont have much opinion in this, maybe we do need a way to cancel the migration, but when/how should we use it? If a migration is ongoing, why should we cancel it? We need to stop the fork to avoid the bad thing happen? or we find out we don't need the migration at the end? People need to know what they are doing before they do it.

SYNCSLOTS (Internal) i see we will return a session_id and this command has some slots range check invoke source node. I somehow want this command just like SYNC, a random client can just issue this command and try to SYNC some slots data, the SYNCSLOTS return a snapshot and replication stream about the slots range. So that the command can work in non-migration cases and allow a user track some slots stream. And btw, why should we need a session_id, is it used to track slotsync status or cancel a slotsync? I think CLUSTER IMPORT CANCEL can do this thing, it can kill the connection and we can just like the replication connection, once the connection is drop, the SYNCSLOTS is drop.

TRANSFERSLOTS (Internal), this command also take session_id, we dont have this in our fork, this command is like ours CLUSTER SLOTFAILOVER i guess, we send SLOTFAILOVER to the target node, and then the target node check all the slots range to see if the connections is both online, and then the target node take the slots ownship and gossip the UPDATE.

Operator WorkFlow, this seems fine.

Epoch Bumping Strategies (Concensusless), we are doing Consensusless Bumping, during the pause, once the target node find out the offset is ok before the timeout, target node will bump epoch and gossip it. This does have the problems mentioned above, but we need to pay attention to the original CLUSTER FAILOVER, Consensus-Based Bumping, and the old primary actually also has same problems. During the migration, we should still advise (or we dont) the user do it in a low-traffic time in case something bad happen.

Streaming Format (AOF), we are doing slot RDB in ours fork, the main issue is that the target node don't have a way to replicate the slot RDB to its replicas. We dont slove this problem. Using AOF seems ok to me, my first thought is also the AOF format.

Observability for Atomic Slot Migration i agree we need this info, though these errors should rarely occur.

About the discusstion

Sorry i decide to skip it since i do lost it a while ago, i hope the top comment is the newly one.

About our implementation

I'm going to take a moment to explain our approach. I discuss this with Ping and Zhaozhao offline last year, at that time we both agree with something, but sadly i now see this topic has moved on a lot. This is the branch https://github.com/enjoy-binbin/valkey/commits/slot_migrations/ & https://github.com/valkey-io/valkey/compare/unstable...enjoy-binbin:valkey:slot_migrations?expand=1 i post to Ping last year, and i have a document about it, sadly it was written in Chineses so i am not able to post it. I knew people get sick of reading a lot of code, but i hope you guys can take a moment to scan it, since i hate to waste the effort, it is not perfect, but it can work. There is a python3 tests/test_slot_sync.py that you can try to play with it (the comment is Chinese sadly). User only do three things: 1. send CLUSTER SLOTSYNC to target node. 2. Check the offset (wait_for_condition) 3. send CLUSTER SLOTFAILOVER to target node and then we are all set.

I will briefly describe the implementation. The main process is slot sync + slot failover.

In a A/B/C cluster, lets say we are trying migrate slots 0-1000 from A to C, and migrate 2000-3000 from B to C.
So the A and B is the source node, the C is the target node.

The high level:

  1. User send CLUSTER SLOTSYNC 0 1000 2000 3000 command to the target node, target node will do some check around the slots range of course. Target node add these slot ranges to its memory list.
  2. Target node will try to connect the source node, like C will connect to A and connect B. It is like replication state machine, it will do the handshake, do the auth, do the capa.
  3. Target node C send SYNC begin_slot end_slot to the source node, like send SYNC 0 1000 to node A, send SYNC 2000 3000 to node B.
  4. When the source node get the SYNC 0 1000 command, it will do the fork, and create the slot RDB and send it to the target node, and doing the replication stream just like the normal replication.
  5. The target node and the source node, they will use REPLCONF slotack / REPLCONF slotdiff to exchange the offset, once the target node know all the links is online and catch up the offset, we can do the next step.
  6. The target node send REPLCONF SLOTFAILOVER to node A and node B. Target node will continue to send replconf slotack offset to the both source nodes.
  7. Node A and node B get the REPLCONF SLOTFAILOVER, pasue the write, and handle replconf slotack offset, once source node find out the offset is catch up, send a REPLCONF slotready to the target node.
  8. Target node, node C, get the REPLCONF slotready and find out all the link is ready, node C will bump the epoch and do the UPDATE.
  9. The source nodes, get the UPDATE and know the slots' ownship are gone, unpause, free the resource, and request the target node to close the connection and free the resource.

Other stuff:

  1. The target node use the bio to load the slot RDB. Depending on isolation, users may be able to use keys * to get data from other slots.
  2. Based on 1, once the loading is done, the target node will request its replicas to do a full sync with him to get the slot RDB data, this is a issue AOF stream can fix.
  3. Currently in the code, the source node will pause all write, we can optimize it to only pause the slots range write.
  4. The slotsync can not do the psync, and can not fit in the global replication buffer (added in 7.0)
  5. The code does not fit in dual channel now.
  6. Observability or error handling, we have some but probably not as complete as the Ping mention.

Sorry for typing so much text and without pictures. Maybe it's because of preconceptions, i kind of like our current solution. I'm not asking that we must accept this Tencent Cloud's solution. Hope you guys will like it, or have more ideas after reading this. I just discussed it with Ping offline in Chinese before, so maybe you guys will interest it.

@murphyjacob4
Copy link
Contributor

My optimistic ETA would be Thursday

I was a bit over-optimistic, but I have a draft PR out now: #1591. It is just about 1k LOC, but will be more with tests

We also need a way to cancel an ongoing SYNCSLOTS session when CLUSTER IMPORT CANCEL is called

I just use a connection close to signal this in my PR

I'm wondering if we need two-three separate commands or if it can be different variants of one command

I opted for this style in the PR - since it turns out there was more state transitions needed. But we can talk more there

Tencent Cloud's solution

I think that the solution is quite similar to what I have implemented (although, API naming is different and there are some slight semantic differences). But overall it is quite similar, with the primary difference being AOF vs RDB based snapshotting. Please feel free to take a look at the draft PR and let me know if you'd like to collaborate further! Happy to set up a shared branch

@zuiderkwast
Copy link
Contributor

@enjoy-binbin Thanks for sharing your solution and what was discussed before in Chinese. :) I think the solution is similar, just a few differences. Your solution seems a bit simpler. I like simple.

User only do three things: 1. send CLUSTER SLOTSYNC to target node. 2. Check the offset (wait_for_condition) 3. send CLUSTER SLOTFAILOVER to target node and then we are all set.

Three steps? CLUSTER IMPORT SLOTS is just one command. I think one command seems better. The target can complete the switchover immediately when the sync is complete, without another admin command.

CLUSTER IMPORT STATUS, why we have these information?
Don't these succeed or fail together?
There should not be partial success, partial failure, or partial cancellation.

Importing one slot range from one node can succeed and importing another range from another node can fail, if the source node crashes or something. Is that right @murphyjacob4?

If there is a failover in the source shard, shall the target node retry importing from the new source primary?

CLUSTER IMPORT CANCEL i dont have much opinion in this, maybe we do need a way to cancel the migration, but when/how should we use it? If a migration is ongoing, why should we cancel it? We need to stop the fork to avoid the bad thing happen? or we find out we don't need the migration at the end? People need to know what they are doing before they do it.

If we can avoid these (STATUS and CANCEL) in the first step, it can be simpler, and we can avoid the op-id. I like the approach of doing the minimal solution and later extending it based on what we need.

To compare with replication, how do we abort a full sync? We use REPLICAOF NO ONE right? For CLUSTER IMPORT, we could also just abort it per slot range, without an op-id. So it seems to me we can live without the op-id. For example CLUSTER IMPORT CANCEL SLOTS 1000 2000. And we can postpone the CANCEL to a separate feature, if we want.

I somehow want this command just like SYNC, a random client can just issue this command and try to SYNC some slots data,

Extending SYNC or adding a new command SYNCSLOTS is not a big difference, but if we want to use REPLCONF to exchange the offset and other things, then maybe SYNC is better?

I don't know why we need a session-id. Just closing the connection seems enough to me.

A related feature that needs replication per slot is #1372. Just keep in mind if we ever want that feature.

@enjoy-binbin
Copy link
Member

enjoy-binbin commented Jan 20, 2025

User only do three things: 1. send CLUSTER SLOTSYNC to target node. 2. Check the offset (wait_for_condition) 3. send CLUSTER SLOTFAILOVER to target node and then we are all set.

Three steps? CLUSTER IMPORT SLOTS is just one command. I think one command seems better. The target can complete the switchover immediately when the sync is complete, without another admin command.

Actually there is no much difference i think, it can be changed easier. Peoply anyway need a way to check the offset, or check the status, it kind of a same job. Unless people plans to check the results in the next day.

Another reason i guess in here is that, we want to split it. We want to only do the SLOT FAILOVER when we require, since the failover will need a pause. Noted that the failover may timeout out if something bad happen. We have this issue before, the Consensus-Based Bumping, the old cluster failover, the manual failover timedout, the primary get pause for a long time. This could be a bug in the code, or a blocked node, or a network problem. In ours fork, we have a arbiter mode, in this mode, we transfer voting rights to the arbiter node, which is an empty primary node, arbiter dont process commands so it won't never get block by user commands. We are doing this arbiter-mode in everywhere, like one-shard cluster, or a cluster that cross multi available zone. In a large cluster, only a few arbiter nodes are needed, their voting is very stable and they are not bound to the primary node or AZ. I think this arbiter-mode is a great stuff we want to open-source, but Madelyn doesn't like it if i recall.

So the reason here is, we want to manually trigger slot failover in a confirmed situation. Of course, this can also be made automatic.

If there is a failover in the source shard, shall the target node retry importing from the new source primary?

the target node will retry (re-init the slot link) with the new primary in this case.

@hwware
Copy link
Member

hwware commented Jan 20, 2025

From my understanding,

  1. The information CLUSTER IMPORT STATUS basically is provided to client side or admin to monitor the slot migration process
  2. The session_id mainly is useful for command CLUSTER IMPORT STATUS OPID to check the slot migration process and command
    CLUSTER IMPORT CANCEL OPID to cancel current slot migration much more smoothly.
  3. @enjoy-binbin I just go through your comment above for your implementation, I think there is no too much difference except the RDB as stream.

@PingXie
Copy link
Member Author

PingXie commented Jan 21, 2025

Thank you, @enjoy-binbin, for the detailed write-up and for contributing your insights to this discussion! I appreciate the effort you've put into exploring this area, and your input has been incredibly helpful in shaping our direction.

Capturing the offline sync with @enjoy-binbin, @soloestoy, and @hwware:

  1. Cancellation and Reporting

We agree on the importance of supporting cancellation and a robust reporting mechanism for slot migration. Since migrations, whether atomic or not, can be long-running operations, operators need a clear way to monitor and abort the process.

  1. AOF-Based Synchronization

We've aligned on using the AOF format for synchronization. This approach offers the advantages of relatively low overhead on the main thread, eliminating the background loading work, and automatic replication to replicas. Optimizations like dual-channel replication and filtering irrelevant delta changes can be explored after the core functionality is implemented.

  1. Epoch Bumps and Reliability

We agree that consensus-based epoch bumps aren't essential for the initial release. As discussed, addressing reliability concerns, such as those related to epoch management, can be better handled within the framework of issue #1355.

  1. Decoupling Data Migration and Slot Ownership Transfer

We identified a new requirement: separating data migration from slot ownership transfer, giving the operator control over each step. This is important because transferring slot ownership can have a greater impact on clients than migrating data, as not all clients handle topology changes efficiently. Allowing just the slot ownership transfer to be performed separately during a dedicated downtime minimizes potential disruptions. valkey-cli can still offer a "one-shot" migration option by combining both steps for convenience.

Next Steps

This is a complex project, so to make it easier to review and manage, I propose breaking it down into smaller PRs. Each PR can deliver a working piece, even if it's not perfect at first, and we can improve things incrementally. I suggest we all take a look at both @enjoy-binbin and @murphyjacob4's PRs to get up to speed.

Thanks again to everyone for their contributions and collaboration!

@murphyjacob4
Copy link
Contributor

murphyjacob4 commented Jan 21, 2025

Importing one slot range from one node can succeed and importing another range from another node can fail, if the source node crashes or something. Is that right @murphyjacob4?

Right. If I am an operator of a 9 shard cluster and I am adding a tenth shard, I will go to the new node and enqueue 9 separate slot migration operations, one for each target node. It is possible for 8 of those to succeed, but for the 9th to encounter issues that result in failure. As an operator, I would want to poll on the status of all 9 operations and look into the failure, and maybe retry

Optimizations like.. filtering irrelevant delta changes can be explored after the core functionality is implemented

Actually - @PingXie - in the PR I wrote this was required to be implemented due to the way chaining replication works. We reuse the chaining replication code which means that the target primary directly proxies its received contents from the slot migration link to its replica links. However, the replicas aren't aware that the slot migration is ongoing, so don't have the context to do the filtering on their end. Instead of adding a replica-informing layer to the protocol, it was easier to filter on the source primary, which also has the benefit of reducing network bandwidth and overall overhead - at the cost of requiring the primary to maintain a dedicated output buffer for the slot migration instead of the replication backlog.

@murphyjacob4
Copy link
Contributor

@enjoy-binbin, @PingXie, and I discussed offline. Binbin and I are joining efforts and going to iterate on the Tencent solution to meet the requirements we outlined here. Once this is ready, we will send a shared PR for review.

Given this, I have closed my pull request. I will be able to repurpose much of this into the Tencent solution to make it AOF based and not require replica full sync on the target replicas.

Here is the new shared branch that @enjoy-binbin has set up for us to work on: https://github.com/enjoy-binbin/valkey/tree/slot_migration_new

@JimB123
Copy link
Contributor

JimB123 commented Jan 24, 2025

It looks like you've been updating the problem statement based on comments. So just reviewing that. Sorry if I repeat responses from others.

Firstly, nice to see atomic slot migration. IMO the existing solution with non-atomic is simply broken as it provides no mechanism for multi-key commands to function reliably.

Questions:
CLUSTER IMPORT SLOTS

  • Is it possible to issue this command while an existing migration (previous command) is still in progress?
  • If so, is there any ordering guarantee? From the various text, I get the impression that each import is viewed as a separate request, and the requests are queued and executed sequentially. Is this correct? Please clarify.
  • Since this is defined as a "pull" type command, I don't see any possibility for conflict with multiple commands, it's just additive, right?
  • If it's not possible to issue multiple commands together, what error is returned when issuing the command a second time? Maybe something like "SLOT MIGRATION ALREADY IN PROGRESS"?

CLUSTER IMPORT STATUS

  • If it's not possible for multiple import commands to be issued, then there's only 1 "current" import operation. In that case, can we make the OPID optional?
  • If we can have multiple simultaneous imports, making the OPID required, then we MUST have a mechanism to list all of the current/recent OPIDs
  • I see "number of slots failed to import". Can you (somewhere) enumerate/discuss the expected failure conditions?

CLUSTER IMPORT CANCEL

  • If the OPID is required, there MUST be a mechanism to list the current OPID(s)

SYNCSLOTS (internal)

  • This returns a "session_id". From this, I'm assuming that this doesn't align with the "op_id". I'm guessing this is because one op_id may span multiple source nodes - but a session_id is for a single source node? Please clarify.
  • When syncslots is used to transfer multiple slots, what happens when there's a failure on the second slot? Can we finalize the transfer of the first slot? Or is the entire operation rolled back?
  • Given that multiple target nodes may all request SYNCSLOTS, what is the behavior with multiple simultaneous requests? Are they queued and handled sequentially? Or are they driven in parallel? Or does the request fail, leaving it to the target host to retry?
  • What is the operational observability on SYNCSLOTS? Just as we need observability of CLUSTER IMPORT SLOTS, we should also have observability on SYNCSLOTS.

TRANSFERSLOTS (internal)

  • Given that this requires a session id. This finalizes the transfer for all of the slots in that session. Is it possible to have a partial failure? Or the entire session is atomic, not the slot?
  • A better name might be TRANSFERSLOTOWNERSHIP. As just "transfer" seems similar to "sync".

High-Level Migration Flow

  • This shows a case where SYNCSLOTS mirrors IMPORTSLOTS. What happens in a case where slots are coming from 2 separate source nodes? Do we expect the 2 SYNCSLOTS to operate in parallel? Or sequentially?
  • Regarding the repl_backlog. I'm assuming that this isn't the "normal" backlog. The normal backlog contains replication events across all slots - but we need to send backlog only for the target slot. I guess this could be done by filtering the backlog at the source, or filtering it at the target. Can you clarify the approach?

Other questions

  • Have you given consideration to how this would/should behave when a FLUSHALL operation is performed on either the source or the target? Since FLUSHALL isn't associated with a key in a slot, I expect it wouldn't be replicated, right? We need to consider the behavior of the flush operation if executed on either the source or the target. For instance, if flushall is executed on the target, while a migration is in progress, should it terminate the migration? Or allow the migration to continue since it hasn't atomically transferred yet?

@PingXie
Copy link
Member Author

PingXie commented Jan 27, 2025

Yeah, I think there are still several fit-and-finish questions we need to address before we can consider this done. So far, what we’ve agreed upon is just the core logic. I’ll share my current thoughts below.

@hwware

For command CLUSTER IMPORT SLOTS, in description part, it is mentioned that Existing target node slots and slots with undetermined sources are skipped. But in the command SYNCSLOTS part, the error message said ERR Slot range <slot_start>-<slot_end> is already being imported.
My understanding is that CLUSTER IMPORT SLOTS is called before SYNCSLOTS command, how the error happens in SYNCSLOTS called?

CLUSTER IMPORT SLOTS is a user-facing command, whereas SYNCSLOTS is internal. These two can run in parallel, and there’s no strict 1:1 mapping between them at the protocol level. For example, a single IMPORT SLOTS command could result in multiple SYNCSLOTS commands, or two IMPORT SLOTS commands could potentially be combined. As a result, the error handling for these two commands needs to be defined independently.

For the command CLUSTER IMPORT CANCEL, if there is ongoing migrated slots, how to proceed the data in the target primary node and source primary node? Could you please provide workflow for it?

I can include a diagram to illustrate this. At a high level, I’d imagine the target node drops all active connections to the sources, stops processing the remaining slots associated with the op_id, and deletes incomplete slots. Slots that have been fully migrated should remain intact. I’m also open to using ABORT instead of CANCEL.

@zuiderkwast

User only do three things: 1. send CLUSTER SLOTSYNC to target node. 2. Check the offset (wait_for_condition) 3. send CLUSTER SLOTFAILOVER to target node and then we are all set.

Three steps? CLUSTER IMPORT SLOTS is just one command. I think one command seems better. The target can complete the switchover immediately when the sync is complete, without another admin command.

We can start with the "one-shot" ASM approach, but I acknowledge that there’s a valid use case for a "two-stage" ASM. In this approach, data migration happens first, and topology updates occur during planned downtime. This avoids the higher impact typically associated with topology changes. That said, I view the "two-stage" ASM as incremental work.

If we can avoid these (STATUS and CANCEL) in the first step, it can be simpler, and we can avoid the op-id. I like the approach of doing the minimal solution and later extending it based on what we need.

The need for STATUS and CANCEL arises because IMPORT SLOTS is an asynchronous call. However, the requirement for op-id could be eliminated if we enforce a single active migration session at a time or allow the ranges to be amended dynamically. I agree this would simplify the process a lot.

To compare with replication, how do we abort a full sync? We use REPLICAOF NO ONE right? For CLUSTER IMPORT, we could also just abort it per slot range, without an op-id. So it seems to me we can live without the op-id. For example CLUSTER IMPORT CANCEL SLOTS 1000 2000. And we can postpone the CANCEL to a separate feature, if we want.

Following the discussion above, If we remove op-id and allow only one active IMPORT SLOTS session at a time or allow the operator to amend the ongoing migration, we could support slot-range-based cancellations. I’m increasingly in favor of this option.

Extending SYNC or adding a new command SYNCSLOTS is not a big difference, but if we want to use REPLCONF to exchange the offset and other things, then maybe SYNC is better?

For internal commands, I’d like to prioritize clarity. Reusing existing replication commands might confuse developers later on. I’m open to renaming SYNCSLOTS—perhaps IMPORTSLOTS would be a better alternative?

I don't know why we need a session-id. Just closing the connection seems enough to me.

The session ID was intended to link the slots being migrated with those whose ownership is being transferred, as these represent two separate internal states. However, now that we plan to support "two-stage" ASM in the future, I agree we should just converge on slots. I’ll update the HLD accordingly.

A related feature that needs replication per slot is #1372. Just keep in mind if we ever want that feature.

Need to think about that some more next.

@JimB123

CLUSTER IMPORT SLOTS

Is it possible to issue this command while an existing migration (previous command) is still in progress?

Allowing this would provide more flexibility but also introduce additional complexity. I think eventually we should support it but I am also OK with a more restrictive experience.

If so, is there any ordering guarantee? From the various text, I get the impression that each import is viewed as a separate request, and the requests are queued and executed sequentially. Is this correct? Please clarify.

As far as the high-level design is concerned, there’s no strict requirement for ordering.

Since this is defined as a "pull" type command, I don't see any possibility for conflict with multiple commands, it's just additive, right?

I’m not entirely sure about the "pull" model reference, but my current thinking is to allow operators to amend the slots to be migrated (without relying on op-id). The server would then process each slot individually, choosing the best migration plan dynamically.

If it's not possible to issue multiple commands together, what error is returned when issuing the command a second time? Maybe something like "SLOT MIGRATION ALREADY IN PROGRESS"?

My mental model is that IMPORT SLOTS would perform the basic input validation, such as ensuring slots are within range. Operators could then use IMPORT STATUS to check the actual execution progress and status.

CLUSTER IMPORT STATUS

If it's not possible for multiple import commands to be issued, then there's only 1 "current" import operation. In that case, can we make the OPID optional?

Agreed. We should eliminate op-id entirely and rely solely on slot numbers.

If we can have multiple simultaneous imports, making the OPID required, then we MUST have a mechanism to list all of the current/recent OPIDs
I see "number of slots failed to import". Can you (somewhere) enumerate/discuss the expected failure conditions?

If we remove op-id altogether, IMPORT STATUS would return detailed information about the slots submitted for migration. Since there would only be one ongoing operation, managing the op-id state becomes unnecessary.

CLUSTER IMPORT CANCEL

If the OPID is required, there MUST be a mechanism to list the current OPID(s)
SYNCSLOTS (internal)

I now think the cancellation should be slot-based, for example: CANCEL 100-200, 301, etc..

This returns a "session_id". From this, I'm assuming that this doesn't align with the "op_id". I'm guessing this is because one op_id may span multiple source nodes - but a session_id is for a single source node? Please clarify.
When syncslots is used to transfer multiple slots, what happens when there's a failure on the second slot? Can we finalize the transfer of the first slot? Or is the entire operation rolled back?
Given that multiple target nodes may all request SYNCSLOTS, what is the behavior with multiple simultaneous requests? Are they queued and handled sequentially? Or are they driven in parallel? Or does the request fail, leaving it to the target host to retry?

session_id will also be removed. I now believe everything should converge around slot numbers.

What is the operational observability on SYNCSLOTS? Just as we need observability of CLUSTER IMPORT SLOTS, we should also have observability on SYNCSLOTS.

For observability, we should log all errors encountered in the server logs. Additionally, we could maintain a collection of migration records with stats such as:

  • Number of slots
  • Key count and key sizes
  • Start and end times
  • Error messages (if applicable)
  • Each record would represent a migration session.

TRANSFERSLOTS (internal)

Given that this requires a session id. This finalizes the transfer for all of the slots in that session. Is it possible to have a partial failure? Or the entire session is atomic, not the slot?
A better name might be TRANSFERSLOTOWNERSHIP. As just "transfer" seems similar to "sync".

This operation should be atomic, as it primarily involves bumping the node’s config epoch and updating the slot ownership. I’m open to name suggestions and we can continue the discussion in the PR. :)

High-Level Migration Flow

This shows a case where SYNCSLOTS mirrors IMPORTSLOTS. What happens in a case where slots are coming from 2 separate source nodes? Do we expect the 2 SYNCSLOTS to operate in parallel? Or sequentially?

Right, separate source nodes would require their own "sessions." Either parallel or sequential execution works for the high-level design. We could start with the simpler approach and optimize incrementally.

Regarding the repl_backlog. I'm assuming that this isn't the "normal" backlog. The normal backlog contains replication events across all slots - but we need to send backlog only for the target slot. I guess this could be done by filtering the backlog at the source, or filtering it at the target. Can you clarify the approach?

Correct. The current plan is to filter at the source. This means we won’t reuse the normal repl_backlog. While a dedicated log could be introduced, my understanding from offline discussions is that we can initially use the client output buffer. Resuming interrupted migrations would require a new backlog mechanism, but that isn’t a priority/concern for now IMO.

Other questions

Have you given consideration to how this would/should behave when a FLUSHALL operation is performed on either the source or the target? Since FLUSHALL isn't associated with a key in a slot, I expect it wouldn't be replicated, right? We need to consider the behavior of the flush operation if executed on either the source or the target. For instance, if flushall is executed on the target, while a migration is in progress, should it terminate the migration? Or allow the migration to continue since it hasn't atomically transferred yet?

That is a great question. My initial thought is that we should cancel all ongoing migrations first and then let the FLUSHALL command proceed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cluster enhancement New feature or request major-decision-pending Major decision pending by TSC team
Projects
None yet
Development

No branches or pull requests

7 participants