-
Notifications
You must be signed in to change notification settings - Fork 724
Snapshot-Based Node Split – Foundation and Core Implementation #8122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Core changes: *Extend pg_dist_node schema: Add nodeisreplica (BOOLEAN) and nodeprimarynodeid (INT4) columns to mark replicas and reference their primary nodes. *Introduce UDF citus_add_replica_node: citus_add_replica_node( replica_hostname TEXT, replica_port INT, primary_hostname TEXT, primary_port INT) RETURNS INT Registers a user‐provisioned PostgreSQL streaming replica as an inactive Citus node linked to its primary. This lays the foundation for the upcoming snapshot-based node split feature.
This commit builds upon the initial replica registration feature by introducing several enhancements and new capabilities for managing replica nodes. The following UDFs have been added to provide more flexible and complete replica management: - `citus_add_replica_node_with_nodeid`: Allows adding a replica by referencing the primary node's ID, which is more convenient than using hostname and port. - `citus_remove_replica_node`: Allows for the removal of an inactive replica node by its hostname and port. - `citus_remove_replica_node_with_nodeid`: Allows for the removal of an inactive replica node by its ID. Additionally the newly created replicas are now assigned a new group ID instead of inheriting the primary's group ID. This change more accurately reflects their status as independent non-usable nodes until they are promoted.
This commit introduces the functionality to promote a streaming replica to a primary node and rebalance the shards between the original primary and the newly promoted node. The key changes include: - **New UDF `citus_promote_replica_and_rebalance`**: This function orchestrates the entire process. It blocks writes on the primary, waits for the replica to catch up, promotes the replica, updates its metadata to a primary node, and then rebalances the shards between the two nodes. - **New UDF `get_snapshot_based_node_split_plan`**: This function provides a preview of how the shards will be distributed between the primary and the replica after the promotion and rebalancing, allowing users to inspect the plan before execution. - **Core Logic for Promotion and Rebalancing**: - node_promotion.c: Contains the implementation for the promotion logic, including checking replication lag and calling `pg_promote`. - shard_rebalancer.c: Extended to calculate the shard distribution plan for the two-node split. - shard_transfer.c: Includes logic to adjust shard placements in the metadata after the split. - node_metadata.c: Updated to handle the activation of a replica as a new primary node. This commit enables to execute the happy path for snapshot-based node split in Citus. While the test cases and code cleanup are still remaining, the core functionality is in place and can be tested.
When adding/removing columns from pg_dist_node during extension upgrades and downgrades, PostgreSQL never reuses the old attnum. Dropped columns leave “holes” (attisdropped = true), and re-added columns with the same name are assigned a new attnum at the end. As a result: Using the compile-time Natts_pg_dist_node to size datumArray/isNullArray can cause buffer overflows or missed attributes. Column lookups must account for these holes to avoid reading wrong data or crashing. This change allocates arrays based on TupleDesc->natts and updates column access logic to correctly handle attisdropped attributes. The commit also contains some expected output updates and a a regression test case to check node split functionality.
Codecov Report❌ Patch coverage is ❌ Your patch status has failed because the patch coverage (66.77%) is below the target coverage (75.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #8122 +/- ##
==========================================
- Coverage 89.16% 88.95% -0.22%
==========================================
Files 285 287 +2
Lines 62450 63071 +621
Branches 7850 7931 +81
==========================================
+ Hits 55681 56102 +421
- Misses 4518 4662 +144
- Partials 2251 2307 +56 🚀 New features to boost your workflow:
|
primaryNode->workerName, primaryNode->workerPort))); | ||
|
||
bool caughtUp = false; | ||
const int catchUpTimeoutSeconds = 300; /* 5 minutes, TODO: Make GUC */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have this as a GUC or a udf parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a new parameter to udf with 300 seconds as default value.
StringInfo replicaLsnQueryResInfo = (StringInfo) linitial(replicaLsnList); | ||
char *replica_lsn_str = replicaLsnQueryResInfo->data; | ||
|
||
if (!primary_lsn_str || !replica_lsn_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment here under which conditions this evaluates to true?
Also, the caller assigns the result to uint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a mistake. This check is not required. I've removed it.
List *primaryShardIdList; | ||
|
||
/* | ||
* replicaShardPlacementList contains the placements that should stay on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add "(clone)" to comment to emphasize the connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve updated this and several other variables to reflect the new naming convention, and also added a few additional NOTICE messages to make the purpose and flow of operations clearer.
.workerNode = primaryNode /* indicate Primary node as a source node */ | ||
}; | ||
|
||
SplitPrimaryReplicaShards *splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to have a check if splitShards is null as in "get_snapshot_based_node_split_plan"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetPrimaryReplicaSplitRebalanceSteps always returns a palloc’d structure, so it can never be NULL. Infect, the check in get_snapshot_based_node_split_plan was unnecessary, and I’ve removed it.
replicaNode->workerName, | ||
replicaNode->workerPort); | ||
|
||
/* Now drop all shards from primary that need to be on the clone node */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also write a comment on at which point the shards that are on primary ( or on replica) are cleaned up from the replica (or from primary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the appropriate NOTICE message. Kindly check if it is more clear now?
The overall idea is great and has strong potential to scale citus cluster size more efficiently. It is useful to add unit tests for each new UDF. The code already includes logic to handle failures at various points, which is great. We also need failure scenarios in the test suite, possibly using mitmproxy or simulating shard locks. In particular, the main flow for promote-clone and rebalance includes non-transactional statements. It would be helpful to have failure scenarios that test these paths and confirm that the cluster remains in a recoverable state when a failure occurs. |
…nd improved error handling This commit significantly improves the clone promotion functionality with better error handling, configurable timeout, and comprehensive test coverage. - Added `catchup_timeout_seconds` parameter to `citus_promote_clone_and_rebalance` function - Default value of 300 seconds if not provided - Updated SQL function definitions in both latest and version-specific files - Updated downgrade script to handle new function signature - Improved input validation with proper error codes and details - Consistent error handling strategy using ereport(ERROR) instead of returning -1 - Enhanced error messages with errcode and errdetail for better debugging - Proper resource cleanup in all error paths - Enhanced `EnsureReplicaIsNotSynchronous` to verify clone connection to primary - Added hostname-to-IP resolution for robust pg_stat_replication queries - Checks multiple fields (application_name, client_hostname, client_addr) for reliable matching - Comprehensive validation of replication relationship before promotion and clone addition - Created `src/backend/distributed/utils/clonenode_utils.c` for clone utility functions - Moved `GetReplicationLag`, `EnsureReplicaIsNotSynchronous`, and `EnsureValidStreamingReplica` to dedicated file - Added `EnsureValidCloneMode` function for additional clone validation - Improved code modularity and reusability - Systematically replaced "replica" with "clone" in function names, struct names, and error messages - Updated `SplitShardsBetweenPrimaryAndReplica` → `SplitShardsBetweenPrimaryAndClone` - Updated `GetPrimaryReplicaSplitRebalanceSteps` → `GetPrimaryCloneSplitRebalanceSteps` - Updated `AdjustShardsForPrimaryReplicaNodeSplit` → `AdjustShardsForPrimaryCloneNodeSplit` - Updated `SplitPrimaryReplicaShards` struct → `SplitPrimaryCloneShards` - Added negative test cases
commit also contains a few indentation fixes
Thanks a lot for the thoughtful review! I’ve made a few follow-up changes based on your feedback: Code improvements: Reorganized some parts and added extra validation around verifying a clone node before it can be registered or promoted. Workflow clarity: Added additional NOTICE messages to make the flow of operations more visible when running the UDFs. Test coverage: Extended the test suite to cover failure and error scenarios. These include cases where promotion fails or where a transaction is rolled back after promote-clone. The tests also verify that the cluster remains in a consistent and recoverable state in those situations. I agree with your suggestion about using mitmproxy (or similar techniques) for simulating more complex failure scenarios. That’s a great idea, and I’d like to propose tackling it in a separate PR so we can keep this one focused. |
* and downgrads. Now the issue here is PostgreSQL never reuses the old | ||
* attnum. Dropped columns leave “holes” (attributes with attisdropped = true), | ||
* and a re-added column with the same name gets a new attnum at the end. So | ||
* we cannot use the deined Natts_pg_dist_node to allocate memory and also |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "deined Natts_pg_dist_node" => "designated Natts_pg_dist_node" ?
Also could other pg_dist_xxx tables that are extended theoretically hit this issue ?
workerNode->workerName, workerNode->workerPort, workerNode-> | ||
nodeId))); | ||
|
||
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the TODO
comment on LockShardsInWorkerPlacementList()
any cause for concern? Just noting that citus_promote_clone_and_rebalance
calls this (indirectly) in step 1, blocking writes on the original primary.
/*
* Acquires shard metadata locks on all shards residing in the given worker node
*
* TODO: This function is not compatible with query from any node feature.
* To ensure proper behavior, it is essential to acquire locks on placements across all nodes
* rather than limiting it to just the coordinator (or the specific node from which this function is called)
*/
void
LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode)
{
List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, lockMode);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm good with merging, catalog changes plus the fix for reading pg_dist_node looks good and the new feature can be used as needed.
…urpose and logic Also adjusted AdjustShardsForPrimaryCloneNodeSplit to downgrade the “inserting delete record ...” message from NOTICE to LOG, following review feedback. This prevents client applications from being overwhelmed with excessive notices in clusters that have a large number of distributed tables. Includes corresponding test cases and updates to expected outputs.
…wngrade is followed by an upgrade (#8144) Unlike what has been fixed in #7950, #8120, #8124, #8121 and #8114, this was not an issue in older releases but is a potential issue to be introduced by the current (13.2) release because in one of recent commits (#8122) two columns has been added to pg_dist_node. In other words, none of the older releases since we started supporting downgrades added new columns to pg_dist_node. The mentioned PR actually attempted avoiding these kind of issues in one of the code-paths but not in some others. So, this PR, avoids memory corruptions around pg_dist_node accessors in a standardized way (as implemented in other example PRs) and in all code-paths.
DESCRIPTION:
This pull request introduces the foundation and core logic for the snapshot-based node split feature in Citus. This feature enables promoting a streaming replica (referred to as a clone in this feature and UI) to a primary node and rebalancing shards between the original and the newly promoted node without requiring a full data copy.
This significantly reduces rebalance times for scale-out operations where the new node already contains a full copy of the data via streaming replication.
Key Highlights
1. Replica (Clone) Registration & Management Infrastructure
Introduces a new set of UDFs to register and manage clone nodes:
These functions allow administrators to register a streaming replica of an existing worker node as a clone, making it eligible for later promotion via snapshot-based split.
2. Snapshot-Based Node Split (Core Implementation)
New core UDF:
This function implements the full workflow to promote a clone and rebalance shards between the old and new primaries. Steps include:
3. Split Plan Preview
A new helper UDF get_snapshot_based_node_split_plan() provides a preview of the shard distribution post-split, without executing the promotion.
Example:
4 Test Infrastructure Enhancements
5. Usage Guide
The snapshot-based node split can be performed using the following workflow:
- Take a Backup of the Worker Node
Run pg_basebackup (or an equivalent tool) against the existing worker node to create a physical backup.
pg_basebackup -h <primary_worker_host> -p <port> -D /path/to/replica/data --write-recovery-conf
- Start the Replica Node
Start PostgreSQL on the replica using the backup data directory, ensuring it is configured as a streaming replica of the original worker node.
- Register the Backup Node as a Clone
Mark the registered replica as a clone of its original worker node:
SELECT * FROM citus_add_clone_node('<clone_host>', <clone_port>, '<primary_host>', <primary_port>);
- Promote and Rebalance the Clone
Promote the clone to a primary and rebalance shards between it and the original worker:
SELECT * FROM citus_promote_clone_and_rebalance('clone_node_id');
- Drop Any Replication Slots from the Original Worker
After promotion, clean up any unused replication slots from the original worker:
SELECT pg_drop_replication_slot('<slot_name>');
Impact & Benefits