@@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker);
115
115
static void SetLockTimeoutLocally (int32 lock_cooldown );
116
116
static void UpdateNodeLocation (int32 nodeId , char * newNodeName , int32 newNodePort ,
117
117
bool localOnly );
118
+ static int GetNodePrimaryNodeIdAttrIndexInPgDistNode (TupleDesc tupleDesc );
119
+ static int GetNodeIsCloneAttrIndexInPgDistNode (TupleDesc tupleDesc );
118
120
static bool UnsetMetadataSyncedForAllWorkers (void );
119
121
static char * GetMetadataSyncCommandToSetNodeColumn (WorkerNode * workerNode ,
120
122
int columnIndex ,
@@ -1196,16 +1198,25 @@ ActivateNodeList(MetadataSyncContext *context)
1196
1198
void
1197
1199
ActivateCloneNodeAsPrimary (WorkerNode * workerNode )
1198
1200
{
1201
+ Relation pgDistNode = table_open (DistNodeRelationId (), AccessShareLock );
1202
+ TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
1203
+ TupleDesc copiedTupleDescriptor = CreateTupleDescCopy (tupleDescriptor );
1204
+ table_close (pgDistNode , AccessShareLock );
1205
+
1199
1206
/*
1200
1207
* Set the node as primary and active.
1201
1208
*/
1202
1209
SetWorkerColumnLocalOnly (workerNode , Anum_pg_dist_node_noderole ,
1203
1210
ObjectIdGetDatum (PrimaryNodeRoleId ()));
1204
1211
SetWorkerColumnLocalOnly (workerNode , Anum_pg_dist_node_isactive ,
1205
1212
BoolGetDatum (true));
1206
- SetWorkerColumnLocalOnly (workerNode , Anum_pg_dist_node_nodeisclone ,
1213
+ SetWorkerColumnLocalOnly (workerNode ,
1214
+ GetNodeIsCloneAttrIndexInPgDistNode (copiedTupleDescriptor ) +
1215
+ 1 ,
1207
1216
BoolGetDatum (false));
1208
- SetWorkerColumnLocalOnly (workerNode , Anum_pg_dist_node_nodeprimarynodeid ,
1217
+ SetWorkerColumnLocalOnly (workerNode ,
1218
+ GetNodePrimaryNodeIdAttrIndexInPgDistNode (
1219
+ copiedTupleDescriptor ) + 1 ,
1209
1220
Int32GetDatum (0 ));
1210
1221
SetWorkerColumnLocalOnly (workerNode , Anum_pg_dist_node_hasmetadata ,
1211
1222
BoolGetDatum (true));
@@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
1779
1790
{
1780
1791
const bool indexOK = true;
1781
1792
1782
- ScanKeyData scanKey [1 ];
1783
- Datum values [Natts_pg_dist_node ];
1784
- bool isnull [Natts_pg_dist_node ];
1785
- bool replace [Natts_pg_dist_node ];
1786
-
1787
1793
Relation pgDistNode = table_open (DistNodeRelationId (), RowExclusiveLock );
1788
1794
TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
1789
1795
1796
+ ScanKeyData scanKey [1 ];
1797
+ Datum * values = palloc0 (tupleDescriptor -> natts * sizeof (Datum ));
1798
+ bool * isnull = palloc0 (tupleDescriptor -> natts * sizeof (bool ));
1799
+ bool * replace = palloc0 (tupleDescriptor -> natts * sizeof (bool ));
1800
+
1790
1801
ScanKeyInit (& scanKey [0 ], Anum_pg_dist_node_nodeid ,
1791
1802
BTEqualStrategyNumber , F_INT4EQ , Int32GetDatum (nodeId ));
1792
1803
@@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
1801
1812
newNodeName , newNodePort )));
1802
1813
}
1803
1814
1804
- memset (replace , 0 , sizeof (replace ));
1805
-
1806
1815
values [Anum_pg_dist_node_nodeport - 1 ] = Int32GetDatum (newNodePort );
1807
1816
isnull [Anum_pg_dist_node_nodeport - 1 ] = false;
1808
1817
replace [Anum_pg_dist_node_nodeport - 1 ] = true;
@@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
1835
1844
1836
1845
systable_endscan (scanDescriptor );
1837
1846
table_close (pgDistNode , NoLock );
1847
+
1848
+ pfree (values );
1849
+ pfree (isnull );
1850
+ pfree (replace );
1838
1851
}
1839
1852
1840
1853
@@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
2105
2118
Relation pgDistNode = table_open (DistNodeRelationId (), AccessShareLock );
2106
2119
TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
2107
2120
2108
- Datum values [ Natts_pg_dist_node ] ;
2109
- bool isnull [ Natts_pg_dist_node ] ;
2110
- bool replace [ Natts_pg_dist_node ] ;
2121
+ Datum * values = palloc0 ( tupleDescriptor -> natts * sizeof ( Datum )) ;
2122
+ bool * isnull = palloc0 ( tupleDescriptor -> natts * sizeof ( bool )) ;
2123
+ bool * replace = palloc0 ( tupleDescriptor -> natts * sizeof ( bool )) ;
2111
2124
2112
- memset (replace , 0 , sizeof (replace ));
2113
2125
values [Anum_pg_dist_node_metadatasynced - 1 ] = DatumGetBool (false);
2114
2126
isnull [Anum_pg_dist_node_metadatasynced - 1 ] = false;
2115
2127
replace [Anum_pg_dist_node_metadatasynced - 1 ] = true;
@@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
2123
2135
2124
2136
table_close (pgDistNode , NoLock );
2125
2137
2138
+ pfree (values );
2139
+ pfree (isnull );
2140
+ pfree (replace );
2141
+
2126
2142
PG_RETURN_VOID ();
2127
2143
}
2128
2144
@@ -2831,17 +2847,16 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
2831
2847
TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
2832
2848
HeapTuple heapTuple = GetNodeTuple (workerNode -> workerName , workerNode -> workerPort );
2833
2849
2834
- Datum values [ Natts_pg_dist_node ] ;
2835
- bool isnull [ Natts_pg_dist_node ] ;
2836
- bool replace [ Natts_pg_dist_node ] ;
2850
+ Datum * values = palloc0 ( tupleDescriptor -> natts * sizeof ( Datum )) ;
2851
+ bool * isnull = palloc0 ( tupleDescriptor -> natts * sizeof ( bool )) ;
2852
+ bool * replace = palloc0 ( tupleDescriptor -> natts * sizeof ( bool )) ;
2837
2853
2838
2854
if (heapTuple == NULL )
2839
2855
{
2840
2856
ereport (ERROR , (errmsg ("could not find valid entry for node \"%s:%d\"" ,
2841
2857
workerNode -> workerName , workerNode -> workerPort )));
2842
2858
}
2843
2859
2844
- memset (replace , 0 , sizeof (replace ));
2845
2860
values [columnIndex - 1 ] = value ;
2846
2861
isnull [columnIndex - 1 ] = false;
2847
2862
replace [columnIndex - 1 ] = true;
@@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
2857
2872
2858
2873
table_close (pgDistNode , NoLock );
2859
2874
2875
+ pfree (values );
2876
+ pfree (isnull );
2877
+ pfree (replace );
2878
+
2860
2879
return newWorkerNode ;
2861
2880
}
2862
2881
@@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void)
3241
3260
static void
3242
3261
InsertNodeRow (int nodeid , char * nodeName , int32 nodePort , NodeMetadata * nodeMetadata )
3243
3262
{
3244
- Datum values [Natts_pg_dist_node ];
3245
- bool isNulls [Natts_pg_dist_node ];
3263
+ Relation pgDistNode = table_open (DistNodeRelationId (), RowExclusiveLock );
3264
+ TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
3265
+
3266
+ Datum * values = palloc0 (tupleDescriptor -> natts * sizeof (Datum ));
3267
+ bool * isNulls = palloc0 (tupleDescriptor -> natts * sizeof (bool ));
3246
3268
3247
3269
Datum nodeClusterStringDatum = CStringGetDatum (nodeMetadata -> nodeCluster );
3248
3270
Datum nodeClusterNameDatum = DirectFunctionCall1 (namein , nodeClusterStringDatum );
3249
3271
3250
- /* form new shard tuple */
3251
- memset (values , 0 , sizeof (values ));
3252
- memset (isNulls , false, sizeof (isNulls ));
3253
-
3254
3272
values [Anum_pg_dist_node_nodeid - 1 ] = UInt32GetDatum (nodeid );
3255
3273
values [Anum_pg_dist_node_groupid - 1 ] = Int32GetDatum (nodeMetadata -> groupId );
3256
3274
values [Anum_pg_dist_node_nodename - 1 ] = CStringGetTextDatum (nodeName );
@@ -3264,14 +3282,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
3264
3282
values [Anum_pg_dist_node_nodecluster - 1 ] = nodeClusterNameDatum ;
3265
3283
values [Anum_pg_dist_node_shouldhaveshards - 1 ] = BoolGetDatum (
3266
3284
nodeMetadata -> shouldHaveShards );
3267
- values [Anum_pg_dist_node_nodeisclone - 1 ] = BoolGetDatum (
3268
- nodeMetadata -> nodeisclone );
3269
- values [Anum_pg_dist_node_nodeprimarynodeid - 1 ] = Int32GetDatum (
3270
- nodeMetadata -> nodeprimarynodeid );
3271
-
3272
- Relation pgDistNode = table_open (DistNodeRelationId (), RowExclusiveLock );
3273
-
3274
- TupleDesc tupleDescriptor = RelationGetDescr (pgDistNode );
3285
+ values [GetNodeIsCloneAttrIndexInPgDistNode (tupleDescriptor )] =
3286
+ BoolGetDatum (nodeMetadata -> nodeisclone );
3287
+ values [GetNodePrimaryNodeIdAttrIndexInPgDistNode (tupleDescriptor )] =
3288
+ Int32GetDatum (nodeMetadata -> nodeprimarynodeid );
3275
3289
HeapTuple heapTuple = heap_form_tuple (tupleDescriptor , values , isNulls );
3276
3290
3277
3291
CATALOG_INSERT_WITH_SNAPSHOT (pgDistNode , heapTuple );
@@ -3283,6 +3297,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
3283
3297
3284
3298
/* close relation */
3285
3299
table_close (pgDistNode , NoLock );
3300
+
3301
+ pfree (values );
3302
+ pfree (isNulls );
3286
3303
}
3287
3304
3288
3305
@@ -3397,43 +3414,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
3397
3414
1 ]);
3398
3415
3399
3416
/*
3400
- * Attributes above this line are guaranteed to be present at the
3401
- * exact defined attribute number. Atleast till now. If you are droping or
3402
- * adding any of the above columns consider adjusting the code above
3417
+ * nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case
3418
+ * of extension creation/upgrade, master_initialize_node_metadata function is
3419
+ * called before the nodecluster column is added to pg_dist_node table.
3403
3420
*/
3404
- Oid pgDistNodeRelId = RelationGetRelid (pgDistNode );
3405
-
3406
- AttrNumber nodeClusterAttno = get_attnum (pgDistNodeRelId , "nodecluster" );
3407
3421
3408
- if (nodeClusterAttno > 0 &&
3409
- !TupleDescAttr (tupleDescriptor , nodeClusterAttno - 1 )-> attisdropped &&
3410
- !isNullArray [nodeClusterAttno - 1 ])
3422
+ if (!isNullArray [Anum_pg_dist_node_nodecluster - 1 ])
3411
3423
{
3412
3424
Name nodeClusterName =
3413
- DatumGetName (datumArray [nodeClusterAttno - 1 ]);
3425
+ DatumGetName (datumArray [Anum_pg_dist_node_nodecluster - 1 ]);
3414
3426
char * nodeClusterString = NameStr (* nodeClusterName );
3415
3427
strlcpy (workerNode -> nodeCluster , nodeClusterString , NAMEDATALEN );
3416
3428
}
3417
3429
3418
- if (nAtts > Anum_pg_dist_node_nodeisclone )
3430
+ int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode (tupleDescriptor );
3431
+ int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode (tupleDescriptor );
3432
+
3433
+ if (!isNullArray [nodeIsCloneIdx ])
3419
3434
{
3420
- AttrNumber nodeIsCloneAttno = get_attnum (pgDistNodeRelId , "nodeisclone" );
3421
- if (nodeIsCloneAttno > 0 &&
3422
- !TupleDescAttr (tupleDescriptor , nodeIsCloneAttno - 1 )-> attisdropped &&
3423
- !isNullArray [nodeIsCloneAttno - 1 ])
3424
- {
3425
- workerNode -> nodeisclone = DatumGetBool (datumArray [nodeIsCloneAttno - 1 ]);
3426
- }
3427
- AttrNumber nodePrimaryNodeIdAttno = get_attnum (pgDistNodeRelId ,
3428
- "nodeprimarynodeid" );
3429
- if (nodePrimaryNodeIdAttno > 0 &&
3430
- !TupleDescAttr (tupleDescriptor , nodePrimaryNodeIdAttno - 1 )-> attisdropped &&
3431
- !isNullArray [nodePrimaryNodeIdAttno - 1 ])
3432
- {
3433
- workerNode -> nodeprimarynodeid = DatumGetInt32 (datumArray [
3434
- nodePrimaryNodeIdAttno - 1 ])
3435
- ;
3436
- }
3435
+ workerNode -> nodeisclone = DatumGetBool (datumArray [nodeIsCloneIdx ]);
3436
+ }
3437
+
3438
+ if (!isNullArray [nodePrimaryNodeIdIdx ])
3439
+ {
3440
+ workerNode -> nodeprimarynodeid = DatumGetInt32 (datumArray [nodePrimaryNodeIdIdx ]);
3437
3441
}
3438
3442
3439
3443
pfree (datumArray );
@@ -3443,6 +3447,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
3443
3447
}
3444
3448
3445
3449
3450
+ /*
3451
+ * GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr.
3452
+ *
3453
+ * nodeprimarynodeid attr was added to table pg_dist_node using alter operation
3454
+ * after the version where Citus started supporting downgrades, and it's one of
3455
+ * the two columns that we've introduced to pg_dist_node since then.
3456
+ *
3457
+ * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
3458
+ * Natts_pg_dist_node and when this happens, then we know that attrnum
3459
+ * nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but
3460
+ * tupleDesc->natts - 1.
3461
+ */
3462
+ static int
3463
+ GetNodePrimaryNodeIdAttrIndexInPgDistNode (TupleDesc tupleDesc )
3464
+ {
3465
+ return tupleDesc -> natts == Natts_pg_dist_node
3466
+ ? (Anum_pg_dist_node_nodeprimarynodeid - 1 )
3467
+ : tupleDesc -> natts - 1 ;
3468
+ }
3469
+
3470
+
3471
+ /*
3472
+ * GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr.
3473
+ *
3474
+ * Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar
3475
+ * calculation for nodeisclone attribute because this is column added to
3476
+ * pg_dist_node after we started supporting downgrades.
3477
+ *
3478
+ * Only difference with the mentioned function is that we know
3479
+ * the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore
3480
+ * but tupleDesc->natts - 2 because we added these columns consecutively
3481
+ * and we first add nodeisclone attribute and then nodeprimarynodeid attribute.
3482
+ */
3483
+ static int
3484
+ GetNodeIsCloneAttrIndexInPgDistNode (TupleDesc tupleDesc )
3485
+ {
3486
+ return tupleDesc -> natts == Natts_pg_dist_node
3487
+ ? (Anum_pg_dist_node_nodeisclone - 1 )
3488
+ : tupleDesc -> natts - 2 ;
3489
+ }
3490
+
3491
+
3446
3492
/*
3447
3493
* StringToDatum transforms a string representation into a Datum.
3448
3494
*/
@@ -3519,15 +3565,15 @@ UnsetMetadataSyncedForAllWorkers(void)
3519
3565
updatedAtLeastOne = true;
3520
3566
}
3521
3567
3568
+ Datum * values = palloc (tupleDescriptor -> natts * sizeof (Datum ));
3569
+ bool * isnull = palloc (tupleDescriptor -> natts * sizeof (bool ));
3570
+ bool * replace = palloc (tupleDescriptor -> natts * sizeof (bool ));
3571
+
3522
3572
while (HeapTupleIsValid (heapTuple ))
3523
3573
{
3524
- Datum values [Natts_pg_dist_node ];
3525
- bool isnull [Natts_pg_dist_node ];
3526
- bool replace [Natts_pg_dist_node ];
3527
-
3528
- memset (replace , false, sizeof (replace ));
3529
- memset (isnull , false, sizeof (isnull ));
3530
- memset (values , 0 , sizeof (values ));
3574
+ memset (values , 0 , tupleDescriptor -> natts * sizeof (Datum ));
3575
+ memset (isnull , 0 , tupleDescriptor -> natts * sizeof (bool ));
3576
+ memset (replace , 0 , tupleDescriptor -> natts * sizeof (bool ));
3531
3577
3532
3578
values [Anum_pg_dist_node_metadatasynced - 1 ] = BoolGetDatum (false);
3533
3579
replace [Anum_pg_dist_node_metadatasynced - 1 ] = true;
@@ -3550,6 +3596,10 @@ UnsetMetadataSyncedForAllWorkers(void)
3550
3596
CatalogCloseIndexes (indstate );
3551
3597
table_close (relation , NoLock );
3552
3598
3599
+ pfree (values );
3600
+ pfree (isnull );
3601
+ pfree (replace );
3602
+
3553
3603
return updatedAtLeastOne ;
3554
3604
}
3555
3605
0 commit comments