SELECT tableoid::regclass, * FROM p2;
 INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
 INSERT INTO pt VALUES (2, 'xyzzy');
+UPDATE pt set a = 1 where a = 2; -- ERROR
 SELECT tableoid::regclass, * FROM pt;
 SELECT tableoid::regclass, * FROM p1;
 SELECT tableoid::regclass, * FROM p2;
 
 INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
 ERROR:  cannot route inserted tuples to a foreign table
 INSERT INTO pt VALUES (2, 'xyzzy');
+UPDATE pt set a = 1 where a = 2; -- ERROR
+ERROR:  cannot route inserted tuples to a foreign table
 SELECT tableoid::regclass, * FROM pt;
  tableoid | a |   b   
 ----------+---+-------
 
     foreign table partitions.
    </para>
 
+   <para>
+    Updating the partition key of a row might cause it to be moved into a
+    different partition where this row satisfies its partition constraint.
+   </para>
+
    <sect3 id="ddl-partitioning-declarative-example">
     <title>Example</title>
 
 
      <listitem>
       <para>
-       An <command>UPDATE</command> that causes a row to move from one partition to
-       another fails, because the new value of the row fails to satisfy the
-       implicit partition constraint of the original partition.
+       When an <command>UPDATE</command> causes a row to move from one
+       partition to another, there is a chance that another concurrent
+       <command>UPDATE</command> or <command>DELETE</command> misses this row.
+       Suppose session 1 is performing an <command>UPDATE</command> on a
+       partition key, and meanwhile a concurrent session 2 for which this row
+       is visible performs an <command>UPDATE</command> or
+       <command>DELETE</command> operation on this row. Session 2 can silently
+       miss the row if the row is deleted from the partition due to session
+       1's activity.  In such case, session 2's
+       <command>UPDATE</command> or <command>DELETE</command>, being unaware of
+       the row movement thinks that the row has just been deleted and concludes
+       that there is nothing to be done for this row. In the usual case where
+       the table is not partitioned, or where there is no row movement,
+       session 2 would have identified the newly updated row and carried out
+       the <command>UPDATE</command>/<command>DELETE</command> on this new row
+       version.
       </para>
      </listitem>
 
 
 
   <para>
    In the case of a partitioned table, updating a row might cause it to no
-   longer satisfy the partition constraint.  Since there is no provision to
-   move the row to the partition appropriate to the new value of its
-   partitioning key, an error will occur in this case.  This can also happen
-   when updating a partition directly.
+   longer satisfy the partition constraint of the containing partition. In that
+   case, if there is some other partition in the partition tree for which this
+   row satisfies its partition constraint, then the row is moved to that
+   partition. If there is no such partition, an error will occur.  Behind the
+   scenes, the row movement is actually a <command>DELETE</command> and
+   <command>INSERT</command> operation. However, there is a possibility that a
+   concurrent <command>UPDATE</command> or <command>DELETE</command> on the
+   same row may miss this row. For details see the section
+   <xref linkend="ddl-partitioning-declarative-limitations"/>.
   </para>
  </refsect1>
 
 
     triggers.
    </para>
 
+   <para>
+    If an <command>UPDATE</command> on a partitioned table causes a row to move
+    to another partition, it will be performed as a <command>DELETE</command>
+    from the original partition followed by an <command>INSERT</command> into
+    the new partition. In this case, all row-level <literal>BEFORE</literal>
+    <command>UPDATE</command> triggers and all row-level
+    <literal>BEFORE</literal> <command>DELETE</command> triggers are fired on
+    the original partition. Then all row-level <literal>BEFORE</literal>
+    <command>INSERT</command> triggers are fired on the destination partition.
+    The possibility of surprising outcomes should be considered when all these
+    triggers affect the row being moved. As far as <literal>AFTER ROW</literal>
+    triggers are concerned, <literal>AFTER</literal> <command>DELETE</command>
+    and <literal>AFTER</literal> <command>INSERT</command> triggers are
+    applied; but <literal>AFTER</literal> <command>UPDATE</command> triggers
+    are not applied because the <command>UPDATE</command> has been converted to
+    a <command>DELETE</command> and an <command>INSERT</command>. As far as
+    statement-level triggers are concerned, none of the
+    <command>DELETE</command> or <command>INSERT</command> triggers are fired,
+    even if row movement occurs; only the <command>UPDATE</command> triggers
+    defined on the target table used in the <command>UPDATE</command> statement
+    will be fired.
+   </para>
+
    <para>
     Trigger functions invoked by per-statement triggers should always
     return <symbol>NULL</symbol>. Trigger functions invoked by per-row
 
    PartitionTupleRouting *partition_tuple_routing;
 
    TransitionCaptureState *transition_capture;
-   TupleConversionMap **transition_tupconv_maps;
 
    /*
     * These variables are used to reduce overhead in textual COPY FROM.
         * tuple).
         */
        if (cstate->transition_capture != NULL)
-       {
-           int         i;
-
-           cstate->transition_tupconv_maps = (TupleConversionMap **)
-               palloc0(sizeof(TupleConversionMap *) * proute->num_partitions);
-           for (i = 0; i < proute->num_partitions; ++i)
-           {
-               cstate->transition_tupconv_maps[i] =
-                   convert_tuples_by_name(RelationGetDescr(proute->partitions[i]->ri_RelationDesc),
-                                          RelationGetDescr(cstate->rel),
-                                          gettext_noop("could not convert row type"));
-           }
-       }
+           ExecSetupChildParentMapForLeaf(proute);
    }
 
    /*
        if (cstate->partition_tuple_routing)
        {
            int         leaf_part_index;
-           TupleConversionMap *map;
            PartitionTupleRouting *proute = cstate->partition_tuple_routing;
 
            /*
                     */
                    cstate->transition_capture->tcs_original_insert_tuple = NULL;
                    cstate->transition_capture->tcs_map =
-                       cstate->transition_tupconv_maps[leaf_part_index];
+                       TupConvMapForLeaf(proute, saved_resultRelInfo,
+                                         leaf_part_index);
                }
                else
                {
             * We might need to convert from the parent rowtype to the
             * partition rowtype.
             */
-           map = proute->partition_tupconv_maps[leaf_part_index];
-           if (map)
-           {
-               Relation    partrel = resultRelInfo->ri_RelationDesc;
-
-               tuple = do_convert_tuple(tuple, map);
-
-               /*
-                * We must use the partition's tuple descriptor from this
-                * point on.  Use a dedicated slot from this point on until
-                * we're finished dealing with the partition.
-                */
-               slot = proute->partition_tuple_slot;
-               Assert(slot != NULL);
-               ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-               ExecStoreTuple(tuple, slot, InvalidBuffer, true);
-           }
+           tuple = ConvertPartitionTupleSlot(proute->parent_child_tupconv_maps[leaf_part_index],
+                                             tuple,
+                                             proute->partition_tuple_slot,
+                                             &slot);
 
            tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
        }
 
    {
        HeapTuple   trigtuple;
 
-       Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
-       if (fdw_trigtuple == NULL)
+       /*
+        * Note: if the UPDATE is converted into a DELETE+INSERT as part of
+        * update-partition-key operation, then this function is also called
+        * separately for DELETE and INSERT to capture transition table rows.
+        * In such case, either old tuple or new tuple can be NULL.
+        */
+       if (fdw_trigtuple == NULL && ItemPointerIsValid(tupleid))
            trigtuple = GetTupleForTrigger(estate,
                                           NULL,
                                           relinfo,
  * triggers actually need to be queued.  It is also called after each row,
  * even if there are no triggers for that event, if there are any AFTER
  * STATEMENT triggers for the statement which use transition tables, so that
- * the transition tuplestores can be built.
+ * the transition tuplestores can be built.  Furthermore, if the transition
+ * capture is happening for UPDATEd rows being moved to another partition due
+ * to the partition-key being changed, then this function is called once when
+ * the row is deleted (to capture OLD row), and once when the row is inserted
+ * into another partition (to capture NEW row).  This is done separately because
+ * DELETE and INSERT happen on different tables.
  *
  * Transition tuplestores are built now, rather than when events are pulled
  * off of the queue because AFTER ROW triggers are allowed to select from the
        bool        update_new_table = transition_capture->tcs_update_new_table;
        bool        insert_new_table = transition_capture->tcs_insert_new_table;;
 
-       if ((event == TRIGGER_EVENT_DELETE && delete_old_table) ||
-           (event == TRIGGER_EVENT_UPDATE && update_old_table))
+       /*
+        * For INSERT events newtup should be non-NULL, for DELETE events
+        * oldtup should be non-NULL, whereas for UPDATE events normally both
+        * oldtup and newtup are non-NULL.  But for UPDATE events fired for
+        * capturing transition tuples during UPDATE partition-key row
+        * movement, oldtup is NULL when the event is for a row being inserted,
+        * whereas newtup is NULL when the event is for a row being deleted.
+        */
+       Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
+                oldtup == NULL));
+       Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
+                newtup == NULL));
+
+       if (oldtup != NULL &&
+           ((event == TRIGGER_EVENT_DELETE && delete_old_table) ||
+            (event == TRIGGER_EVENT_UPDATE && update_old_table)))
        {
            Tuplestorestate *old_tuplestore;
 
-           Assert(oldtup != NULL);
            old_tuplestore = transition_capture->tcs_private->old_tuplestore;
 
            if (map != NULL)
            else
                tuplestore_puttuple(old_tuplestore, oldtup);
        }
-       if ((event == TRIGGER_EVENT_INSERT && insert_new_table) ||
-           (event == TRIGGER_EVENT_UPDATE && update_new_table))
+       if (newtup != NULL &&
+           ((event == TRIGGER_EVENT_INSERT && insert_new_table) ||
+           (event == TRIGGER_EVENT_UPDATE && update_new_table)))
        {
            Tuplestorestate *new_tuplestore;
 
-           Assert(newtup != NULL);
            new_tuplestore = transition_capture->tcs_private->new_tuplestore;
 
            if (original_insert_tuple != NULL)
                tuplestore_puttuple(new_tuplestore, newtup);
        }
 
-       /* If transition tables are the only reason we're here, return. */
+       /*
+        * If transition tables are the only reason we're here, return. As
+        * mentioned above, we can also be here during update tuple routing in
+        * presence of transition tables, in which case this function is called
+        * separately for oldtup and newtup, so we expect exactly one of them
+        * to be NULL.
+        */
        if (trigdesc == NULL ||
            (event == TRIGGER_EVENT_DELETE && !trigdesc->trig_delete_after_row) ||
            (event == TRIGGER_EVENT_INSERT && !trigdesc->trig_insert_after_row) ||
-           (event == TRIGGER_EVENT_UPDATE && !trigdesc->trig_update_after_row))
+           (event == TRIGGER_EVENT_UPDATE && !trigdesc->trig_update_after_row) ||
+           (event == TRIGGER_EVENT_UPDATE && ((oldtup == NULL) ^ (newtup == NULL))))
            return;
    }
 
 
    List       *leaf_parts;
    ListCell   *cell;
    int         i;
-   ResultRelInfo *leaf_part_rri;
+   ResultRelInfo *leaf_part_arr = NULL,
+              *update_rri = NULL;
+   int         num_update_rri = 0,
+               update_rri_index = 0;
+   bool        is_update = false;
    PartitionTupleRouting *proute;
 
    /*
    proute->num_partitions = list_length(leaf_parts);
    proute->partitions = (ResultRelInfo **) palloc(proute->num_partitions *
                                                   sizeof(ResultRelInfo *));
-   proute->partition_tupconv_maps =
+   proute->parent_child_tupconv_maps =
        (TupleConversionMap **) palloc0(proute->num_partitions *
                                        sizeof(TupleConversionMap *));
 
+   /* Set up details specific to the type of tuple routing we are doing. */
+   if (mtstate && mtstate->operation == CMD_UPDATE)
+   {
+       ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
+
+       is_update = true;
+       update_rri = mtstate->resultRelInfo;
+       num_update_rri = list_length(node->plans);
+       proute->subplan_partition_offsets =
+           palloc(num_update_rri * sizeof(int));
+
+       /*
+        * We need an additional tuple slot for storing transient tuples that
+        * are converted to the root table descriptor.
+        */
+       proute->root_tuple_slot = MakeTupleTableSlot();
+   }
+   else
+   {
+       /*
+        * Since we are inserting tuples, we need to create all new result
+        * rels. Avoid repeated pallocs by allocating memory for all the
+        * result rels in bulk.
+        */
+       leaf_part_arr = (ResultRelInfo *) palloc0(proute->num_partitions *
+                                                 sizeof(ResultRelInfo));
+   }
+
    /*
     * Initialize an empty slot that will be used to manipulate tuples of any
     * given partition's rowtype.  It is attached to the caller-specified node
     */
    proute->partition_tuple_slot = MakeTupleTableSlot();
 
-   leaf_part_rri = (ResultRelInfo *) palloc0(proute->num_partitions *
-                                             sizeof(ResultRelInfo));
    i = 0;
    foreach(cell, leaf_parts)
    {
-       Relation    partrel;
+       ResultRelInfo *leaf_part_rri;
+       Relation    partrel = NULL;
        TupleDesc   part_tupdesc;
+       Oid         leaf_oid = lfirst_oid(cell);
+
+       if (is_update)
+       {
+           /*
+            * If the leaf partition is already present in the per-subplan
+            * result rels, we re-use that rather than initialize a new result
+            * rel. The per-subplan resultrels and the resultrels of the leaf
+            * partitions are both in the same canonical order. So while going
+            * through the leaf partition oids, we need to keep track of the
+            * next per-subplan result rel to be looked for in the leaf
+            * partition resultrels.
+            */
+           if (update_rri_index < num_update_rri &&
+               RelationGetRelid(update_rri[update_rri_index].ri_RelationDesc) == leaf_oid)
+           {
+               leaf_part_rri = &update_rri[update_rri_index];
+               partrel = leaf_part_rri->ri_RelationDesc;
+
+               /*
+                * This is required in order to we convert the partition's
+                * tuple to be compatible with the root partitioned table's
+                * tuple descriptor.  When generating the per-subplan result
+                * rels, this was not set.
+                */
+               leaf_part_rri->ri_PartitionRoot = rel;
+
+               /* Remember the subplan offset for this ResultRelInfo */
+               proute->subplan_partition_offsets[update_rri_index] = i;
+
+               update_rri_index++;
+           }
+           else
+               leaf_part_rri = (ResultRelInfo *) palloc0(sizeof(ResultRelInfo));
+       }
+       else
+       {
+           /* For INSERTs, we already have an array of result rels allocated */
+           leaf_part_rri = &leaf_part_arr[i];
+       }
 
        /*
-        * We locked all the partitions above including the leaf partitions.
-        * Note that each of the relations in proute->partitions are
-        * eventually closed by the caller.
+        * If we didn't open the partition rel, it means we haven't
+        * initialized the result rel either.
         */
-       partrel = heap_open(lfirst_oid(cell), NoLock);
+       if (!partrel)
+       {
+           /*
+            * We locked all the partitions above including the leaf
+            * partitions. Note that each of the newly opened relations in
+            * proute->partitions are eventually closed by the caller.
+            */
+           partrel = heap_open(leaf_oid, NoLock);
+           InitResultRelInfo(leaf_part_rri,
+                             partrel,
+                             resultRTindex,
+                             rel,
+                             estate->es_instrument);
+       }
+
        part_tupdesc = RelationGetDescr(partrel);
 
        /*
         * Save a tuple conversion map to convert a tuple routed to this
         * partition from the parent's type to the partition's.
         */
-       proute->partition_tupconv_maps[i] =
+       proute->parent_child_tupconv_maps[i] =
            convert_tuples_by_name(tupDesc, part_tupdesc,
                                   gettext_noop("could not convert row type"));
 
-       InitResultRelInfo(leaf_part_rri,
-                         partrel,
-                         resultRTindex,
-                         rel,
-                         estate->es_instrument);
-
        /*
-        * Verify result relation is a valid target for INSERT.
+        * Verify result relation is a valid target for an INSERT.  An UPDATE
+        * of a partition-key becomes a DELETE+INSERT operation, so this check
+        * is still required when the operation is CMD_UPDATE.
         */
        CheckValidResultRel(leaf_part_rri, CMD_INSERT);
 
        estate->es_leaf_result_relations =
            lappend(estate->es_leaf_result_relations, leaf_part_rri);
 
-       proute->partitions[i] = leaf_part_rri++;
+       proute->partitions[i] = leaf_part_rri;
        i++;
    }
 
+   /*
+    * For UPDATE, we should have found all the per-subplan resultrels in the
+    * leaf partitions.
+    */
+   Assert(!is_update || update_rri_index == num_update_rri);
+
    return proute;
 }
 
    return result;
 }
 
+/*
+ * ExecSetupChildParentMapForLeaf -- Initialize the per-leaf-partition
+ * child-to-root tuple conversion map array.
+ *
+ * This map is required for capturing transition tuples when the target table
+ * is a partitioned table. For a tuple that is routed by an INSERT or UPDATE,
+ * we need to convert it from the leaf partition to the target table
+ * descriptor.
+ */
+void
+ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute)
+{
+   Assert(proute != NULL);
+
+   /*
+    * These array elements gets filled up with maps on an on-demand basis.
+    * Initially just set all of them to NULL.
+    */
+   proute->child_parent_tupconv_maps =
+       (TupleConversionMap **) palloc0(sizeof(TupleConversionMap *) *
+                                       proute->num_partitions);
+
+   /* Same is the case for this array. All the values are set to false */
+   proute->child_parent_map_not_required =
+       (bool *) palloc0(sizeof(bool) * proute->num_partitions);
+}
+
+/*
+ * TupConvMapForLeaf -- Get the tuple conversion map for a given leaf partition
+ * index.
+ */
+TupleConversionMap *
+TupConvMapForLeaf(PartitionTupleRouting *proute,
+                 ResultRelInfo *rootRelInfo, int leaf_index)
+{
+   ResultRelInfo **resultRelInfos = proute->partitions;
+   TupleConversionMap **map;
+   TupleDesc   tupdesc;
+
+   /* Don't call this if we're not supposed to be using this type of map. */
+   Assert(proute->child_parent_tupconv_maps != NULL);
+
+   /* If it's already known that we don't need a map, return NULL. */
+   if (proute->child_parent_map_not_required[leaf_index])
+       return NULL;
+
+   /* If we've already got a map, return it. */
+   map = &proute->child_parent_tupconv_maps[leaf_index];
+   if (*map != NULL)
+       return *map;
+
+   /* No map yet; try to create one. */
+   tupdesc = RelationGetDescr(resultRelInfos[leaf_index]->ri_RelationDesc);
+   *map =
+       convert_tuples_by_name(tupdesc,
+                              RelationGetDescr(rootRelInfo->ri_RelationDesc),
+                              gettext_noop("could not convert row type"));
+
+   /* If it turns out no map is needed, remember for next time. */
+   proute->child_parent_map_not_required[leaf_index] = (*map == NULL);
+
+   return *map;
+}
+
+/*
+ * ConvertPartitionTupleSlot -- convenience function for tuple conversion.
+ * The tuple, if converted, is stored in new_slot, and *p_my_slot is
+ * updated to point to it.  new_slot typically should be one of the
+ * dedicated partition tuple slots. If map is NULL, *p_my_slot is not changed.
+ *
+ * Returns the converted tuple, unless map is NULL, in which case original
+ * tuple is returned unmodified.
+ */
+HeapTuple
+ConvertPartitionTupleSlot(TupleConversionMap *map,
+                         HeapTuple tuple,
+                         TupleTableSlot *new_slot,
+                         TupleTableSlot **p_my_slot)
+{
+   if (!map)
+       return tuple;
+
+   tuple = do_convert_tuple(tuple, map);
+
+   /*
+    * Change the partition tuple slot descriptor, as per converted tuple.
+    */
+   *p_my_slot = new_slot;
+   Assert(new_slot != NULL);
+   ExecSetSlotDescriptor(new_slot, map->outdesc);
+   ExecStoreTuple(tuple, new_slot, InvalidBuffer, true);
+
+   return tuple;
+}
+
 /*
  * ExecCleanupTupleRouting -- Clean up objects allocated for partition tuple
  * routing.
  * Close all the partitioned tables, leaf partitions, and their indices.
  */
 void
-ExecCleanupTupleRouting(PartitionTupleRouting * proute)
+ExecCleanupTupleRouting(PartitionTupleRouting *proute)
 {
    int         i;
+   int         subplan_index = 0;
 
    /*
     * Remember, proute->partition_dispatch_info[0] corresponds to the root
    {
        ResultRelInfo *resultRelInfo = proute->partitions[i];
 
+       /*
+        * If this result rel is one of the UPDATE subplan result rels, let
+        * ExecEndPlan() close it. For INSERT or COPY,
+        * proute->subplan_partition_offsets will always be NULL. Note that
+        * the subplan_partition_offsets array and the partitions array have
+        * the partitions in the same order. So, while we iterate over
+        * partitions array, we also iterate over the
+        * subplan_partition_offsets array in order to figure out which of the
+        * result rels are present in the UPDATE subplans.
+        */
+       if (proute->subplan_partition_offsets &&
+           proute->subplan_partition_offsets[subplan_index] == i)
+       {
+           subplan_index++;
+           continue;
+       }
+
        ExecCloseIndices(resultRelInfo);
        heap_close(resultRelInfo->ri_RelationDesc, NoLock);
    }
 
-   /* Release the standalone partition tuple descriptor, if any */
+   /* Release the standalone partition tuple descriptors, if any */
+   if (proute->root_tuple_slot)
+       ExecDropSingleTupleTableSlot(proute->root_tuple_slot);
    if (proute->partition_tuple_slot)
        ExecDropSingleTupleTableSlot(proute->partition_tuple_slot);
 }
 
                     EState *estate,
                     bool canSetTag,
                     TupleTableSlot **returning);
+static ResultRelInfo *getTargetResultRelInfo(ModifyTableState *node);
+static void ExecSetupChildParentMapForTcs(ModifyTableState *mtstate);
+static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
+static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
+                       int whichplan);
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
    Oid         newId;
    List       *recheckIndexes = NIL;
    TupleTableSlot *result = NULL;
+   TransitionCaptureState *ar_insert_trig_tcs;
 
    /*
     * get the heap tuple out of the tuple table slot, making sure we have a
    {
        int         leaf_part_index;
        PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
-       TupleConversionMap *map;
 
        /*
         * Away we go ... If we end up not finding a partition after all,
                 * back to tuplestore format.
                 */
                mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
+
                mtstate->mt_transition_capture->tcs_map =
-                   mtstate->mt_transition_tupconv_maps[leaf_part_index];
+                   TupConvMapForLeaf(proute, saved_resultRelInfo,
+                                     leaf_part_index);
            }
            else
            {
            }
        }
        if (mtstate->mt_oc_transition_capture != NULL)
+       {
            mtstate->mt_oc_transition_capture->tcs_map =
-               mtstate->mt_transition_tupconv_maps[leaf_part_index];
+               TupConvMapForLeaf(proute, saved_resultRelInfo,
+                                 leaf_part_index);
+       }
 
        /*
         * We might need to convert from the parent rowtype to the partition
         * rowtype.
         */
-       map = proute->partition_tupconv_maps[leaf_part_index];
-       if (map)
-       {
-           Relation    partrel = resultRelInfo->ri_RelationDesc;
-
-           tuple = do_convert_tuple(tuple, map);
-
-           /*
-            * We must use the partition's tuple descriptor from this point
-            * on, until we're finished dealing with the partition. Use the
-            * dedicated slot for that.
-            */
-           slot = proute->partition_tuple_slot;
-           Assert(slot != NULL);
-           ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-           ExecStoreTuple(tuple, slot, InvalidBuffer, true);
-       }
+       tuple = ConvertPartitionTupleSlot(proute->parent_child_tupconv_maps[leaf_part_index],
+                                         tuple,
+                                         proute->partition_tuple_slot,
+                                         &slot);
    }
 
    resultRelationDesc = resultRelInfo->ri_RelationDesc;
    }
    else
    {
+       WCOKind     wco_kind;
+
        /*
         * We always check the partition constraint, including when the tuple
         * got here via tuple-routing.  However we don't need to in the latter
        tuple->t_tableOid = RelationGetRelid(resultRelationDesc);
 
        /*
-        * Check any RLS INSERT WITH CHECK policies
+        * Check any RLS WITH CHECK policies.
         *
+        * Normally we should check INSERT policies. But if the insert is the
+        * result of a partition key update that moved the tuple to a new
+        * partition, we should instead check UPDATE policies, because we are
+        * executing policies defined on the target table, and not those
+        * defined on the child partitions.
+        */
+       wco_kind = (mtstate->operation == CMD_UPDATE) ?
+           WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+
+       /*
         * ExecWithCheckOptions() will skip any WCOs which are not of the kind
         * we are looking for at this point.
         */
        if (resultRelInfo->ri_WithCheckOptions != NIL)
-           ExecWithCheckOptions(WCO_RLS_INSERT_CHECK,
-                                resultRelInfo, slot, estate);
+           ExecWithCheckOptions(wco_kind, resultRelInfo, slot, estate);
 
        /*
         * No need though if the tuple has been routed, and a BR trigger
        setLastTid(&(tuple->t_self));
    }
 
+   /*
+    * If this insert is the result of a partition key update that moved the
+    * tuple to a new partition, put this row into the transition NEW TABLE,
+    * if there is one. We need to do this separately for DELETE and INSERT
+    * because they happen on different tables.
+    */
+   ar_insert_trig_tcs = mtstate->mt_transition_capture;
+   if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+       && mtstate->mt_transition_capture->tcs_update_new_table)
+   {
+       ExecARUpdateTriggers(estate, resultRelInfo, NULL,
+                            NULL,
+                            tuple,
+                            NULL,
+                            mtstate->mt_transition_capture);
+
+       /*
+        * We've already captured the NEW TABLE row, so make sure any AR
+        * INSERT trigger fired below doesn't capture it again.
+        */
+       ar_insert_trig_tcs = NULL;
+   }
+
    /* AFTER ROW INSERT Triggers */
    ExecARInsertTriggers(estate, resultRelInfo, tuple, recheckIndexes,
-                        mtstate->mt_transition_capture);
+                        ar_insert_trig_tcs);
 
    list_free(recheckIndexes);
 
           TupleTableSlot *planSlot,
           EPQState *epqstate,
           EState *estate,
+          bool *tupleDeleted,
+          bool processReturning,
           bool canSetTag)
 {
    ResultRelInfo *resultRelInfo;
    HTSU_Result result;
    HeapUpdateFailureData hufd;
    TupleTableSlot *slot = NULL;
+   TransitionCaptureState *ar_delete_trig_tcs;
+
+   if (tupleDeleted)
+       *tupleDeleted = false;
 
    /*
     * get information on the (current) result relation
    if (canSetTag)
        (estate->es_processed)++;
 
+   /* Tell caller that the delete actually happened. */
+   if (tupleDeleted)
+       *tupleDeleted = true;
+
+   /*
+    * If this delete is the result of a partition key update that moved the
+    * tuple to a new partition, put this row into the transition OLD TABLE,
+    * if there is one. We need to do this separately for DELETE and INSERT
+    * because they happen on different tables.
+    */
+   ar_delete_trig_tcs = mtstate->mt_transition_capture;
+   if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+       && mtstate->mt_transition_capture->tcs_update_old_table)
+   {
+       ExecARUpdateTriggers(estate, resultRelInfo,
+                            tupleid,
+                            oldtuple,
+                            NULL,
+                            NULL,
+                            mtstate->mt_transition_capture);
+
+       /*
+        * We've already captured the NEW TABLE row, so make sure any AR
+        * DELETE trigger fired below doesn't capture it again.
+        */
+       ar_delete_trig_tcs = NULL;
+   }
+
    /* AFTER ROW DELETE Triggers */
    ExecARDeleteTriggers(estate, resultRelInfo, tupleid, oldtuple,
-                        mtstate->mt_transition_capture);
+                        ar_delete_trig_tcs);
 
-   /* Process RETURNING if present */
-   if (resultRelInfo->ri_projectReturning)
+   /* Process RETURNING if present and if requested */
+   if (processReturning && resultRelInfo->ri_projectReturning)
    {
        /*
         * We have to put the target tuple into a slot, which means first we
    HTSU_Result result;
    HeapUpdateFailureData hufd;
    List       *recheckIndexes = NIL;
+   TupleConversionMap *saved_tcs_map = NULL;
 
    /*
     * abort the operation if not running transactions
    else
    {
        LockTupleMode lockmode;
+       bool        partition_constraint_failed;
 
        /*
         * Constraints might reference the tableoid column, so initialize
         * (We don't need to redo triggers, however.  If there are any BEFORE
         * triggers then trigger.c will have done heap_lock_tuple to lock the
         * correct tuple, so there's no need to do them again.)
-        *
-        * ExecWithCheckOptions() will skip any WCOs which are not of the kind
-        * we are looking for at this point.
         */
 lreplace:;
-       if (resultRelInfo->ri_WithCheckOptions != NIL)
+
+       /*
+        * If partition constraint fails, this row might get moved to another
+        * partition, in which case we should check the RLS CHECK policy just
+        * before inserting into the new partition, rather than doing it here.
+        * This is because a trigger on that partition might again change the
+        * row.  So skip the WCO checks if the partition constraint fails.
+        */
+       partition_constraint_failed =
+           resultRelInfo->ri_PartitionCheck &&
+           !ExecPartitionCheck(resultRelInfo, slot, estate);
+
+       if (!partition_constraint_failed &&
+           resultRelInfo->ri_WithCheckOptions != NIL)
+       {
+           /*
+            * ExecWithCheckOptions() will skip any WCOs which are not of the
+            * kind we are looking for at this point.
+            */
            ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,
                                 resultRelInfo, slot, estate);
+       }
+
+       /*
+        * If a partition check failed, try to move the row into the right
+        * partition.
+        */
+       if (partition_constraint_failed)
+       {
+           bool        tuple_deleted;
+           TupleTableSlot *ret_slot;
+           PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+           int         map_index;
+           TupleConversionMap *tupconv_map;
+
+           /*
+            * When an UPDATE is run on a leaf partition, we will not have
+            * partition tuple routing set up. In that case, fail with
+            * partition constraint violation error.
+            */
+           if (proute == NULL)
+               ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
+
+           /*
+            * Row movement, part 1.  Delete the tuple, but skip RETURNING
+            * processing. We want to return rows from INSERT.
+            */
+           ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
+                      &tuple_deleted, false, false);
+
+           /*
+            * For some reason if DELETE didn't happen (e.g. trigger prevented
+            * it, or it was already deleted by self, or it was concurrently
+            * deleted by another transaction), then we should skip the insert
+            * as well; otherwise, an UPDATE could cause an increase in the
+            * total number of rows across all partitions, which is clearly
+            * wrong.
+            *
+            * For a normal UPDATE, the case where the tuple has been the
+            * subject of a concurrent UPDATE or DELETE would be handled by
+            * the EvalPlanQual machinery, but for an UPDATE that we've
+            * translated into a DELETE from this partition and an INSERT into
+            * some other partition, that's not available, because CTID chains
+            * can't span relation boundaries.  We mimic the semantics to a
+            * limited extent by skipping the INSERT if the DELETE fails to
+            * find a tuple. This ensures that two concurrent attempts to
+            * UPDATE the same tuple at the same time can't turn one tuple
+            * into two, and that an UPDATE of a just-deleted tuple can't
+            * resurrect it.
+            */
+           if (!tuple_deleted)
+               return NULL;
+
+           /*
+            * Updates set the transition capture map only when a new subplan
+            * is chosen.  But for inserts, it is set for each row. So after
+            * INSERT, we need to revert back to the map created for UPDATE;
+            * otherwise the next UPDATE will incorrectly use the one created
+            * for INSERT.  So first save the one created for UPDATE.
+            */
+           if (mtstate->mt_transition_capture)
+               saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
+
+           /*
+            * resultRelInfo is one of the per-subplan resultRelInfos.  So we
+            * should convert the tuple into root's tuple descriptor, since
+            * ExecInsert() starts the search from root.  The tuple conversion
+            * map list is in the order of mtstate->resultRelInfo[], so to
+            * retrieve the one for this resultRel, we need to know the
+            * position of the resultRel in mtstate->resultRelInfo[].
+            */
+           map_index = resultRelInfo - mtstate->resultRelInfo;
+           Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
+           tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
+           tuple = ConvertPartitionTupleSlot(tupconv_map,
+                                             tuple,
+                                             proute->root_tuple_slot,
+                                             &slot);
+
+
+           /*
+            * For ExecInsert(), make it look like we are inserting into the
+            * root.
+            */
+           Assert(mtstate->rootResultRelInfo != NULL);
+           estate->es_result_relation_info = mtstate->rootResultRelInfo;
+
+           ret_slot = ExecInsert(mtstate, slot, planSlot, NULL,
+                                 ONCONFLICT_NONE, estate, canSetTag);
+
+           /*
+            * Revert back the active result relation and the active
+            * transition capture map that we changed above.
+            */
+           estate->es_result_relation_info = resultRelInfo;
+           if (mtstate->mt_transition_capture)
+           {
+               mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
+               mtstate->mt_transition_capture->tcs_map = saved_tcs_map;
+           }
+           return ret_slot;
+       }
 
        /*
         * Check the constraints of the tuple.  Note that we pass the same
         * slot for the orig_slot argument, because unlike ExecInsert(), no
         * tuple-routing is performed here, hence the slot remains unchanged.
+        * We've already checked the partition constraint above; however, we
+        * must still ensure the tuple passes all other constraints, so we
+        * will call ExecConstraints() and have it validate all remaining
+        * checks.
         */
-       if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
-           ExecConstraints(resultRelInfo, slot, estate, true);
+       if (resultRelationDesc->rd_att->constr)
+           ExecConstraints(resultRelInfo, slot, estate, false);
 
        /*
         * replace the heap tuple
 }
 
 /*
- * Return the ResultRelInfo for which we will fire AFTER STATEMENT triggers.
- * This is also the relation into whose tuple format all captured transition
- * tuples must be converted.
+ * Return the target rel ResultRelInfo.
+ *
+ * This relation is the same as :
+ * - the relation for which we will fire AFTER STATEMENT triggers.
+ * - the relation into whose tuple format all captured transition tuples must
+ *   be converted.
+ * - the root partitioned table.
  */
 static ResultRelInfo *
-getASTriggerResultRelInfo(ModifyTableState *node)
+getTargetResultRelInfo(ModifyTableState *node)
 {
    /*
-    * If the node modifies a partitioned table, we must fire its triggers.
-    * Note that in that case, node->resultRelInfo points to the first leaf
-    * partition, not the root table.
+    * Note that if the node modifies a partitioned table, node->resultRelInfo
+    * points to the first leaf partition, not the root table.
     */
    if (node->rootResultRelInfo != NULL)
        return node->rootResultRelInfo;
 static void
 fireASTriggers(ModifyTableState *node)
 {
-   ResultRelInfo *resultRelInfo = getASTriggerResultRelInfo(node);
+   ResultRelInfo *resultRelInfo = getTargetResultRelInfo(node);
 
    switch (node->operation)
    {
 static void
 ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
 {
-   ResultRelInfo *targetRelInfo = getASTriggerResultRelInfo(mtstate);
-   int         i;
+   ResultRelInfo *targetRelInfo = getTargetResultRelInfo(mtstate);
 
    /* Check for transition tables on the directly targeted relation. */
    mtstate->mt_transition_capture =
    if (mtstate->mt_transition_capture != NULL ||
        mtstate->mt_oc_transition_capture != NULL)
    {
-       int         numResultRelInfos;
-       PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
-
-       numResultRelInfos = (proute != NULL ?
-                            proute->num_partitions :
-                            mtstate->mt_nplans);
+       ExecSetupChildParentMapForTcs(mtstate);
 
        /*
-        * Build array of conversion maps from each child's TupleDesc to the
-        * one used in the tuplestore.  The map pointers may be NULL when no
-        * conversion is necessary, which is hopefully a common case for
-        * partitions.
+        * Install the conversion map for the first plan for UPDATE and DELETE
+        * operations.  It will be advanced each time we switch to the next
+        * plan.  (INSERT operations set it every time, so we need not update
+        * mtstate->mt_oc_transition_capture here.)
         */
-       mtstate->mt_transition_tupconv_maps = (TupleConversionMap **)
-           palloc0(sizeof(TupleConversionMap *) * numResultRelInfos);
+       if (mtstate->mt_transition_capture && mtstate->operation != CMD_INSERT)
+           mtstate->mt_transition_capture->tcs_map =
+               tupconv_map_for_subplan(mtstate, 0);
+   }
+}
 
-       /* Choose the right set of partitions */
-       if (proute != NULL)
-       {
-           /*
-            * For tuple routing among partitions, we need TupleDescs based on
-            * the partition routing table.
-            */
-           ResultRelInfo **resultRelInfos = proute->partitions;
+/*
+ * Initialize the child-to-root tuple conversion map array for UPDATE subplans.
+ *
+ * This map array is required to convert the tuple from the subplan result rel
+ * to the target table descriptor. This requirement arises for two independent
+ * scenarios:
+ * 1. For update-tuple-routing.
+ * 2. For capturing tuples in transition tables.
+ */
+void
+ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate)
+{
+   ResultRelInfo *targetRelInfo = getTargetResultRelInfo(mtstate);
+   ResultRelInfo *resultRelInfos = mtstate->resultRelInfo;
+   TupleDesc   outdesc;
+   int         numResultRelInfos = mtstate->mt_nplans;
+   int         i;
 
-           for (i = 0; i < numResultRelInfos; ++i)
-           {
-               mtstate->mt_transition_tupconv_maps[i] =
-                   convert_tuples_by_name(RelationGetDescr(resultRelInfos[i]->ri_RelationDesc),
-                                          RelationGetDescr(targetRelInfo->ri_RelationDesc),
-                                          gettext_noop("could not convert row type"));
-           }
-       }
-       else
-       {
-           /* Otherwise we need the ResultRelInfo for each subplan. */
-           ResultRelInfo *resultRelInfos = mtstate->resultRelInfo;
+   /*
+    * First check if there is already a per-subplan array allocated. Even if
+    * there is already a per-leaf map array, we won't require a per-subplan
+    * one, since we will use the subplan offset array to convert the subplan
+    * index to per-leaf index.
+    */
+   if (mtstate->mt_per_subplan_tupconv_maps ||
+       (mtstate->mt_partition_tuple_routing &&
+        mtstate->mt_partition_tuple_routing->child_parent_tupconv_maps))
+       return;
 
-           for (i = 0; i < numResultRelInfos; ++i)
-           {
-               mtstate->mt_transition_tupconv_maps[i] =
-                   convert_tuples_by_name(RelationGetDescr(resultRelInfos[i].ri_RelationDesc),
-                                          RelationGetDescr(targetRelInfo->ri_RelationDesc),
-                                          gettext_noop("could not convert row type"));
-           }
-       }
+   /*
+    * Build array of conversion maps from each child's TupleDesc to the one
+    * used in the target relation.  The map pointers may be NULL when no
+    * conversion is necessary, which is hopefully a common case.
+    */
 
+   /* Get tuple descriptor of the target rel. */
+   outdesc = RelationGetDescr(targetRelInfo->ri_RelationDesc);
+
+   mtstate->mt_per_subplan_tupconv_maps = (TupleConversionMap **)
+       palloc(sizeof(TupleConversionMap *) * numResultRelInfos);
+
+   for (i = 0; i < numResultRelInfos; ++i)
+   {
+       mtstate->mt_per_subplan_tupconv_maps[i] =
+           convert_tuples_by_name(RelationGetDescr(resultRelInfos[i].ri_RelationDesc),
+                                  outdesc,
+                                  gettext_noop("could not convert row type"));
+   }
+}
+
+/*
+ * Initialize the child-to-root tuple conversion map array required for
+ * capturing transition tuples.
+ *
+ * The map array can be indexed either by subplan index or by leaf-partition
+ * index.  For transition tables, we need a subplan-indexed access to the map,
+ * and where tuple-routing is present, we also require a leaf-indexed access.
+ */
+static void
+ExecSetupChildParentMapForTcs(ModifyTableState *mtstate)
+{
+   PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+
+   /*
+    * If partition tuple routing is set up, we will require partition-indexed
+    * access. In that case, create the map array indexed by partition; we
+    * will still be able to access the maps using a subplan index by
+    * converting the subplan index to a partition index using
+    * subplan_partition_offsets. If tuple routing is not set up, it means we
+    * don't require partition-indexed access. In that case, create just a
+    * subplan-indexed map.
+    */
+   if (proute)
+   {
        /*
-        * Install the conversion map for the first plan for UPDATE and DELETE
-        * operations.  It will be advanced each time we switch to the next
-        * plan.  (INSERT operations set it every time, so we need not update
-        * mtstate->mt_oc_transition_capture here.)
+        * If a partition-indexed map array is to be created, the subplan map
+        * array has to be NULL.  If the subplan map array is already created,
+        * we won't be able to access the map using a partition index.
         */
-       if (mtstate->mt_transition_capture)
-           mtstate->mt_transition_capture->tcs_map =
-               mtstate->mt_transition_tupconv_maps[0];
+       Assert(mtstate->mt_per_subplan_tupconv_maps == NULL);
+
+       ExecSetupChildParentMapForLeaf(proute);
+   }
+   else
+       ExecSetupChildParentMapForSubplan(mtstate);
+}
+
+/*
+ * For a given subplan index, get the tuple conversion map.
+ */
+static TupleConversionMap *
+tupconv_map_for_subplan(ModifyTableState *mtstate, int whichplan)
+{
+   /*
+    * If a partition-index tuple conversion map array is allocated, we need
+    * to first get the index into the partition array. Exactly *one* of the
+    * two arrays is allocated. This is because if there is a partition array
+    * required, we don't require subplan-indexed array since we can translate
+    * subplan index into partition index. And, we create a subplan-indexed
+    * array *only* if partition-indexed array is not required.
+    */
+   if (mtstate->mt_per_subplan_tupconv_maps == NULL)
+   {
+       int         leaf_index;
+       PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+
+       /*
+        * If subplan-indexed array is NULL, things should have been arranged
+        * to convert the subplan index to partition index.
+        */
+       Assert(proute && proute->subplan_partition_offsets != NULL);
+
+       leaf_index = proute->subplan_partition_offsets[whichplan];
+
+       return TupConvMapForLeaf(proute, getTargetResultRelInfo(mtstate),
+                                leaf_index);
+   }
+   else
+   {
+       Assert(whichplan >= 0 && whichplan < mtstate->mt_nplans);
+       return mtstate->mt_per_subplan_tupconv_maps[whichplan];
    }
 }
 
                /* Prepare to convert transition tuples from this child. */
                if (node->mt_transition_capture != NULL)
                {
-                   Assert(node->mt_transition_tupconv_maps != NULL);
                    node->mt_transition_capture->tcs_map =
-                       node->mt_transition_tupconv_maps[node->mt_whichplan];
+                       tupconv_map_for_subplan(node, node->mt_whichplan);
                }
                if (node->mt_oc_transition_capture != NULL)
                {
-                   Assert(node->mt_transition_tupconv_maps != NULL);
                    node->mt_oc_transition_capture->tcs_map =
-                       node->mt_transition_tupconv_maps[node->mt_whichplan];
+                       tupconv_map_for_subplan(node, node->mt_whichplan);
                }
                continue;
            }
                break;
            case CMD_DELETE:
                slot = ExecDelete(node, tupleid, oldtuple, planSlot,
-                                 &node->mt_epqstate, estate, node->canSetTag);
+                                 &node->mt_epqstate, estate,
+                                 NULL, true, node->canSetTag);
                break;
            default:
                elog(ERROR, "unknown operation");
    ResultRelInfo *saved_resultRelInfo;
    ResultRelInfo *resultRelInfo;
    Plan       *subplan;
+   int         firstVarno = 0;
+   Relation    firstResultRel = NULL;
    ListCell   *l;
    int         i;
    Relation    rel;
+   bool        update_tuple_routing_needed = node->partColsUpdated;
    PartitionTupleRouting *proute = NULL;
    int         num_partitions = 0;
 
            resultRelInfo->ri_IndexRelationDescs == NULL)
            ExecOpenIndices(resultRelInfo, mtstate->mt_onconflict != ONCONFLICT_NONE);
 
+       /*
+        * If this is an UPDATE and a BEFORE UPDATE trigger is present, the
+        * trigger itself might modify the partition-key values. So arrange
+        * for tuple routing.
+        */
+       if (resultRelInfo->ri_TrigDesc &&
+           resultRelInfo->ri_TrigDesc->trig_update_before_row &&
+           operation == CMD_UPDATE)
+           update_tuple_routing_needed = true;
+
        /* Now init the plan for this result rel */
        estate->es_result_relation_info = resultRelInfo;
        mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags);
 
    estate->es_result_relation_info = saved_resultRelInfo;
 
-   /* Build state for INSERT tuple routing */
-   rel = mtstate->resultRelInfo->ri_RelationDesc;
-   if (operation == CMD_INSERT &&
-       rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+   /* Get the target relation */
+   rel = (getTargetResultRelInfo(mtstate))->ri_RelationDesc;
+
+   /*
+    * If it's not a partitioned table after all, UPDATE tuple routing should
+    * not be attempted.
+    */
+   if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+       update_tuple_routing_needed = false;
+
+   /*
+    * Build state for tuple routing if it's an INSERT or if it's an UPDATE of
+    * partition key.
+    */
+   if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+       (operation == CMD_INSERT || update_tuple_routing_needed))
    {
        proute = mtstate->mt_partition_tuple_routing =
            ExecSetupPartitionTupleRouting(mtstate,
                                           rel, node->nominalRelation,
                                           estate);
        num_partitions = proute->num_partitions;
+
+       /*
+        * Below are required as reference objects for mapping partition
+        * attno's in expressions such as WithCheckOptions and RETURNING.
+        */
+       firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
+       firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
    }
 
    /*
    if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
        ExecSetupTransitionCaptureState(mtstate, estate);
 
+   /*
+    * Construct mapping from each of the per-subplan partition attnos to the
+    * root attno.  This is required when during update row movement the tuple
+    * descriptor of a source partition does not match the root partitioned
+    * table descriptor.  In such a case we need to convert tuples to the root
+    * tuple descriptor, because the search for destination partition starts
+    * from the root.  Skip this setup if it's not a partition key update.
+    */
+   if (update_tuple_routing_needed)
+       ExecSetupChildParentMapForSubplan(mtstate);
+
    /*
     * Initialize any WITH CHECK OPTION constraints if needed.
     */
     * Build WITH CHECK OPTION constraints for each leaf partition rel. Note
     * that we didn't build the withCheckOptionList for each partition within
     * the planner, but simple translation of the varattnos for each partition
-    * will suffice.  This only occurs for the INSERT case; UPDATE/DELETE
-    * cases are handled above.
+    * will suffice.  This only occurs for the INSERT case or for UPDATE row
+    * movement. DELETEs and local UPDATEs are handled above.
     */
    if (node->withCheckOptionLists != NIL && num_partitions > 0)
    {
-       List       *wcoList;
-       PlanState  *plan;
+       List       *first_wcoList;
 
        /*
         * In case of INSERT on partitioned tables, there is only one plan.
         * Likewise, there is only one WITH CHECK OPTIONS list, not one per
-        * partition.  We make a copy of the WCO qual for each partition; note
-        * that, if there are SubPlans in there, they all end up attached to
-        * the one parent Plan node.
+        * partition. Whereas for UPDATE, there are as many WCOs as there are
+        * plans. So in either case, use the WCO expression of the first
+        * resultRelInfo as a reference to calculate attno's for the WCO
+        * expression of each of the partitions. We make a copy of the WCO
+        * qual for each partition. Note that, if there are SubPlans in there,
+        * they all end up attached to the one parent Plan node.
         */
-       Assert(operation == CMD_INSERT &&
-              list_length(node->withCheckOptionLists) == 1 &&
-              mtstate->mt_nplans == 1);
-       wcoList = linitial(node->withCheckOptionLists);
-       plan = mtstate->mt_plans[0];
+       Assert(update_tuple_routing_needed ||
+              (operation == CMD_INSERT &&
+               list_length(node->withCheckOptionLists) == 1 &&
+               mtstate->mt_nplans == 1));
+
+       first_wcoList = linitial(node->withCheckOptionLists);
        for (i = 0; i < num_partitions; i++)
        {
            Relation    partrel;
            ListCell   *ll;
 
            resultRelInfo = proute->partitions[i];
+
+           /*
+            * If we are referring to a resultRelInfo from one of the update
+            * result rels, that result rel would already have
+            * WithCheckOptions initialized.
+            */
+           if (resultRelInfo->ri_WithCheckOptions)
+               continue;
+
            partrel = resultRelInfo->ri_RelationDesc;
 
-           /* varno = node->nominalRelation */
-           mapped_wcoList = map_partition_varattnos(wcoList,
-                                                    node->nominalRelation,
-                                                    partrel, rel, NULL);
+           mapped_wcoList = map_partition_varattnos(first_wcoList,
+                                                    firstVarno,
+                                                    partrel, firstResultRel,
+                                                    NULL);
            foreach(ll, mapped_wcoList)
            {
                WithCheckOption *wco = castNode(WithCheckOption, lfirst(ll));
                ExprState  *wcoExpr = ExecInitQual(castNode(List, wco->qual),
-                                                  plan);
+                                                  &mtstate->ps);
 
                wcoExprs = lappend(wcoExprs, wcoExpr);
            }
    {
        TupleTableSlot *slot;
        ExprContext *econtext;
-       List       *returningList;
+       List       *firstReturningList;
 
        /*
         * Initialize result tuple slot and assign its rowtype using the first
         * Build a projection for each leaf partition rel.  Note that we
         * didn't build the returningList for each partition within the
         * planner, but simple translation of the varattnos for each partition
-        * will suffice.  This only occurs for the INSERT case; UPDATE/DELETE
-        * are handled above.
+        * will suffice.  This only occurs for the INSERT case or for UPDATE
+        * row movement. DELETEs and local UPDATEs are handled above.
         */
-       returningList = linitial(node->returningLists);
+       firstReturningList = linitial(node->returningLists);
        for (i = 0; i < num_partitions; i++)
        {
            Relation    partrel;
            List       *rlist;
 
            resultRelInfo = proute->partitions[i];
+
+           /*
+            * If we are referring to a resultRelInfo from one of the update
+            * result rels, that result rel would already have a returningList
+            * built.
+            */
+           if (resultRelInfo->ri_projectReturning)
+               continue;
+
            partrel = resultRelInfo->ri_RelationDesc;
 
-           /* varno = node->nominalRelation */
-           rlist = map_partition_varattnos(returningList,
-                                           node->nominalRelation,
-                                           partrel, rel, NULL);
+           /*
+            * Use the returning expression of the first resultRelInfo as a
+            * reference to calculate attno's for the returning expression of
+            * each of the partitions.
+            */
+           rlist = map_partition_varattnos(firstReturningList,
+                                           firstVarno,
+                                           partrel, firstResultRel, NULL);
            resultRelInfo->ri_projectReturning =
                ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
                                        resultRelInfo->ri_RelationDesc->rd_att);
 
    COPY_SCALAR_FIELD(canSetTag);
    COPY_SCALAR_FIELD(nominalRelation);
    COPY_NODE_FIELD(partitioned_rels);
+   COPY_SCALAR_FIELD(partColsUpdated);
    COPY_NODE_FIELD(resultRelations);
    COPY_SCALAR_FIELD(resultRelIndex);
    COPY_SCALAR_FIELD(rootResultRelIndex);
 
    COPY_SCALAR_FIELD(parent_relid);
    COPY_NODE_FIELD(child_rels);
+   COPY_SCALAR_FIELD(part_cols_updated);
 
    return newnode;
 }
 
 {
    COMPARE_SCALAR_FIELD(parent_relid);
    COMPARE_NODE_FIELD(child_rels);
+   COMPARE_SCALAR_FIELD(part_cols_updated);
 
    return true;
 }
 
    WRITE_BOOL_FIELD(canSetTag);
    WRITE_UINT_FIELD(nominalRelation);
    WRITE_NODE_FIELD(partitioned_rels);
+   WRITE_BOOL_FIELD(partColsUpdated);
    WRITE_NODE_FIELD(resultRelations);
    WRITE_INT_FIELD(resultRelIndex);
    WRITE_INT_FIELD(rootResultRelIndex);
    WRITE_BOOL_FIELD(canSetTag);
    WRITE_UINT_FIELD(nominalRelation);
    WRITE_NODE_FIELD(partitioned_rels);
+   WRITE_BOOL_FIELD(partColsUpdated);
    WRITE_NODE_FIELD(resultRelations);
    WRITE_NODE_FIELD(subpaths);
    WRITE_NODE_FIELD(subroots);
 
    WRITE_UINT_FIELD(parent_relid);
    WRITE_NODE_FIELD(child_rels);
+   WRITE_BOOL_FIELD(part_cols_updated);
 }
 
 static void
 
    READ_BOOL_FIELD(canSetTag);
    READ_UINT_FIELD(nominalRelation);
    READ_NODE_FIELD(partitioned_rels);
+   READ_BOOL_FIELD(partColsUpdated);
    READ_NODE_FIELD(resultRelations);
    READ_INT_FIELD(resultRelIndex);
    READ_INT_FIELD(rootResultRelIndex);
 
            case RTE_RELATION:
                if (rte->relkind == RELKIND_PARTITIONED_TABLE)
                    partitioned_rels =
-                       get_partitioned_child_rels(root, rel->relid);
+                       get_partitioned_child_rels(root, rel->relid, NULL);
                break;
            case RTE_SUBQUERY:
                build_partitioned_rels = true;
        {
            List       *cprels;
 
-           cprels = get_partitioned_child_rels(root, childrel->relid);
+           cprels = get_partitioned_child_rels(root, childrel->relid, NULL);
            partitioned_rels = list_concat(partitioned_rels,
                                           list_copy(cprels));
        }
 
 static ModifyTable *make_modifytable(PlannerInfo *root,
                 CmdType operation, bool canSetTag,
                 Index nominalRelation, List *partitioned_rels,
+                bool partColsUpdated,
                 List *resultRelations, List *subplans,
                 List *withCheckOptionLists, List *returningLists,
                 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
                            best_path->canSetTag,
                            best_path->nominalRelation,
                            best_path->partitioned_rels,
+                           best_path->partColsUpdated,
                            best_path->resultRelations,
                            subplans,
                            best_path->withCheckOptionLists,
 make_modifytable(PlannerInfo *root,
                 CmdType operation, bool canSetTag,
                 Index nominalRelation, List *partitioned_rels,
+                bool partColsUpdated,
                 List *resultRelations, List *subplans,
                 List *withCheckOptionLists, List *returningLists,
                 List *rowMarks, OnConflictExpr *onconflict, int epqParam)
    node->canSetTag = canSetTag;
    node->nominalRelation = nominalRelation;
    node->partitioned_rels = partitioned_rels;
+   node->partColsUpdated = partColsUpdated;
    node->resultRelations = resultRelations;
    node->resultRelIndex = -1;  /* will be set correctly in setrefs.c */
    node->rootResultRelIndex = -1;  /* will be set correctly in setrefs.c */
 
    Query      *parent_parse;
    Bitmapset  *parent_relids = bms_make_singleton(top_parentRTindex);
    PlannerInfo **parent_roots = NULL;
+   bool        partColsUpdated = false;
 
    Assert(parse->commandType != CMD_INSERT);
 
    if (parent_rte->relkind == RELKIND_PARTITIONED_TABLE)
    {
        nominalRelation = top_parentRTindex;
-       partitioned_rels = get_partitioned_child_rels(root, top_parentRTindex);
+       partitioned_rels = get_partitioned_child_rels(root, top_parentRTindex,
+                                                     &partColsUpdated);
        /* The root partitioned table is included as a child rel */
        Assert(list_length(partitioned_rels) >= 1);
    }
                                     parse->canSetTag,
                                     nominalRelation,
                                     partitioned_rels,
+                                    partColsUpdated,
                                     resultRelations,
                                     subpaths,
                                     subroots,
                                        parse->canSetTag,
                                        parse->resultRelation,
                                        NIL,
+                                       false,
                                        list_make1_int(parse->resultRelation),
                                        list_make1(path),
                                        list_make1(root),
 /*
  * get_partitioned_child_rels
  *     Returns a list of the RT indexes of the partitioned child relations
- *     with rti as the root parent RT index.
+ *     with rti as the root parent RT index. Also sets
+ *     *part_cols_updated to true if any of the root rte's updated
+ *     columns is used in the partition key either of the relation whose RTI
+ *     is specified or of any child relation.
  *
  * Note: This function might get called even for range table entries that
  * are not partitioned tables; in such a case, it will simply return NIL.
  */
 List *
-get_partitioned_child_rels(PlannerInfo *root, Index rti)
+get_partitioned_child_rels(PlannerInfo *root, Index rti,
+                          bool *part_cols_updated)
 {
    List       *result = NIL;
    ListCell   *l;
 
+   if (part_cols_updated)
+       *part_cols_updated = false;
+
    foreach(l, root->pcinfo_list)
    {
        PartitionedChildRelInfo *pc = lfirst_node(PartitionedChildRelInfo, l);
        if (pc->parent_relid == rti)
        {
            result = pc->child_rels;
+           if (part_cols_updated)
+               *part_cols_updated = pc->part_cols_updated;
            break;
        }
    }
 
                           RangeTblEntry *parentrte,
                           Index parentRTindex, Relation parentrel,
                           PlanRowMark *top_parentrc, LOCKMODE lockmode,
-                          List **appinfos, List **partitioned_child_rels);
+                          List **appinfos, List **partitioned_child_rels,
+                          bool *part_cols_updated);
 static void expand_single_inheritance_child(PlannerInfo *root,
                                RangeTblEntry *parentrte,
                                Index parentRTindex, Relation parentrel,
    if (RelationGetPartitionDesc(oldrelation) != NULL)
    {
        List       *partitioned_child_rels = NIL;
+       bool        part_cols_updated = false;
 
        Assert(rte->relkind == RELKIND_PARTITIONED_TABLE);
 
        /*
         * If this table has partitions, recursively expand them in the order
-        * in which they appear in the PartitionDesc.
+        * in which they appear in the PartitionDesc.  While at it, also
+        * extract the partition key columns of all the partitioned tables.
         */
        expand_partitioned_rtentry(root, rte, rti, oldrelation, oldrc,
                                   lockmode, &root->append_rel_list,
-                                  &partitioned_child_rels);
+                                  &partitioned_child_rels,
+                                  &part_cols_updated);
 
        /*
         * We keep a list of objects in root, each of which maps a root
            pcinfo = makeNode(PartitionedChildRelInfo);
            pcinfo->parent_relid = rti;
            pcinfo->child_rels = partitioned_child_rels;
+           pcinfo->part_cols_updated = part_cols_updated;
            root->pcinfo_list = lappend(root->pcinfo_list, pcinfo);
        }
    }
 expand_partitioned_rtentry(PlannerInfo *root, RangeTblEntry *parentrte,
                           Index parentRTindex, Relation parentrel,
                           PlanRowMark *top_parentrc, LOCKMODE lockmode,
-                          List **appinfos, List **partitioned_child_rels)
+                          List **appinfos, List **partitioned_child_rels,
+                          bool *part_cols_updated)
 {
    int         i;
    RangeTblEntry *childrte;
 
    Assert(parentrte->inh);
 
+   /*
+    * Note down whether any partition key cols are being updated. Though it's
+    * the root partitioned table's updatedCols we are interested in, we
+    * instead use parentrte to get the updatedCols. This is convenient because
+    * parentrte already has the root partrel's updatedCols translated to match
+    * the attribute ordering of parentrel.
+    */
+   if (!*part_cols_updated)
+       *part_cols_updated =
+           has_partition_attrs(parentrel, parentrte->updatedCols, NULL);
+
    /* First expand the partitioned table itself. */
    expand_single_inheritance_child(root, parentrte, parentRTindex, parentrel,
                                    top_parentrc, parentrel,
        if (childrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
            expand_partitioned_rtentry(root, childrte, childRTindex,
                                       childrel, top_parentrc, lockmode,
-                                      appinfos, partitioned_child_rels);
+                                      appinfos, partitioned_child_rels,
+                                      part_cols_updated);
 
        /* Close child relation, but keep locks */
        heap_close(childrel, NoLock);
 
  * 'partitioned_rels' is an integer list of RT indexes of non-leaf tables in
  *     the partition tree, if this is an UPDATE/DELETE to a partitioned table.
  *     Otherwise NIL.
+ * 'partColsUpdated' is true if any partitioning columns are being updated,
+ *     either from the target relation or a descendent partitioned table.
  * 'resultRelations' is an integer list of actual RT indexes of target rel(s)
  * 'subpaths' is a list of Path(s) producing source data (one per rel)
  * 'subroots' is a list of PlannerInfo structs (one per rel)
 create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
                        CmdType operation, bool canSetTag,
                        Index nominalRelation, List *partitioned_rels,
+                       bool partColsUpdated,
                        List *resultRelations, List *subpaths,
                        List *subroots,
                        List *withCheckOptionLists, List *returningLists,
    pathnode->canSetTag = canSetTag;
    pathnode->nominalRelation = nominalRelation;
    pathnode->partitioned_rels = list_copy(partitioned_rels);
+   pathnode->partColsUpdated = partColsUpdated;
    pathnode->resultRelations = resultRelations;
    pathnode->subpaths = subpaths;
    pathnode->subroots = subroots;
 
  *                             for every leaf partition in the partition tree.
  * num_partitions              Number of leaf partitions in the partition tree
  *                             (= 'partitions' array length)
- * partition_tupconv_maps      Array of TupleConversionMap objects with one
+ * parent_child_tupconv_maps   Array of TupleConversionMap objects with one
  *                             entry for every leaf partition (required to
- *                             convert input tuple based on the root table's
- *                             rowtype to a leaf partition's rowtype after
- *                             tuple routing is done)
+ *                             convert tuple from the root table's rowtype to
+ *                             a leaf partition's rowtype after tuple routing
+ *                             is done)
+ * child_parent_tupconv_maps   Array of TupleConversionMap objects with one
+ *                             entry for every leaf partition (required to
+ *                             convert an updated tuple from the leaf
+ *                             partition's rowtype to the root table's rowtype
+ *                             so that tuple routing can be done)
+ * child_parent_map_not_required  Array of bool. True value means that a map is
+ *                             determined to be not required for the given
+ *                             partition. False means either we haven't yet
+ *                             checked if a map is required, or it was
+ *                             determined to be required.
+ * subplan_partition_offsets   Integer array ordered by UPDATE subplans. Each
+ *                             element of this array has the index into the
+ *                             corresponding partition in partitions array.
  * partition_tuple_slot            TupleTableSlot to be used to manipulate any
  *                             given leaf partition's rowtype after that
  *                             partition is chosen for insertion by
    int         num_dispatch;
    ResultRelInfo **partitions;
    int         num_partitions;
-   TupleConversionMap **partition_tupconv_maps;
+   TupleConversionMap **parent_child_tupconv_maps;
+   TupleConversionMap **child_parent_tupconv_maps;
+   bool       *child_parent_map_not_required;
+   int        *subplan_partition_offsets;
    TupleTableSlot *partition_tuple_slot;
+   TupleTableSlot *root_tuple_slot;
 } PartitionTupleRouting;
 
 extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(ModifyTableState *mtstate,
                  PartitionDispatch *pd,
                  TupleTableSlot *slot,
                  EState *estate);
+extern void ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute);
+extern TupleConversionMap *TupConvMapForLeaf(PartitionTupleRouting *proute,
+                 ResultRelInfo *rootRelInfo, int leaf_index);
+extern HeapTuple ConvertPartitionTupleSlot(TupleConversionMap *map,
+                         HeapTuple tuple,
+                         TupleTableSlot *new_slot,
+                         TupleTableSlot **p_my_slot);
 extern void ExecCleanupTupleRouting(PartitionTupleRouting *proute);
 
 #endif                         /* EXECPARTITION_H */
 
    /* controls transition table population for specified operation */
    struct TransitionCaptureState *mt_oc_transition_capture;
    /* controls transition table population for INSERT...ON CONFLICT UPDATE */
-   TupleConversionMap **mt_transition_tupconv_maps;
-   /* Per plan/partition tuple conversion */
+   TupleConversionMap **mt_per_subplan_tupconv_maps;
+   /* Per plan map for tuple conversion from child to root */
 } ModifyTableState;
 
 /* ----------------
 
    Index       nominalRelation;    /* Parent RT index for use of EXPLAIN */
    /* RT indexes of non-leaf tables in a partition tree */
    List       *partitioned_rels;
+   bool        partColsUpdated;    /* some part key in hierarchy updated */
    List       *resultRelations;    /* integer list of RT indexes */
    int         resultRelIndex; /* index of first resultRel in plan's list */
    int         rootResultRelIndex; /* index of the partitioned table root */
 
    Index       nominalRelation;    /* Parent RT index for use of EXPLAIN */
    /* RT indexes of non-leaf tables in a partition tree */
    List       *partitioned_rels;
+   bool        partColsUpdated;    /* some part key in hierarchy updated */
    List       *resultRelations;    /* integer list of RT indexes */
    List       *subpaths;       /* Path(s) producing source data */
    List       *subroots;       /* per-target-table PlannerInfos */
 
    Index       parent_relid;
    List       *child_rels;
+   bool        part_cols_updated;  /* is the partition key of any of
+                                    * the partitioned tables updated? */
 } PartitionedChildRelInfo;
 
 /*
 
                        RelOptInfo *rel,
                        CmdType operation, bool canSetTag,
                        Index nominalRelation, List *partitioned_rels,
+                       bool partColsUpdated,
                        List *resultRelations, List *subpaths,
                        List *subroots,
                        List *withCheckOptionLists, List *returningLists,
 
 
 extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid);
 
-extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti);
+extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti,
+                          bool *part_cols_updated);
 extern List *get_partitioned_child_rels_for_join(PlannerInfo *root,
                                    Relids join_relids);
 
 
 
 DROP TABLE update_test;
 DROP TABLE upsert_test;
--- update to a partition should check partition bound constraint for the new tuple
-create table range_parted (
+---------------------------
+-- UPDATE with row movement
+---------------------------
+-- When a partitioned table receives an UPDATE to the partitioned key and the
+-- new values no longer meet the partition's bound, the row must be moved to
+-- the correct partition for the new partition key (if one exists). We must
+-- also ensure that updatable views on partitioned tables properly enforce any
+-- WITH CHECK OPTION that is defined. The situation with triggers in this case
+-- also requires thorough testing as partition key updates causing row
+-- movement convert UPDATEs into DELETE+INSERT.
+CREATE TABLE range_parted (
    a text,
-   b int
-) partition by range (a, b);
-create table part_a_1_a_10 partition of range_parted for values from ('a', 1) to ('a', 10);
-create table part_a_10_a_20 partition of range_parted for values from ('a', 10) to ('a', 20);
-create table part_b_1_b_10 partition of range_parted for values from ('b', 1) to ('b', 10);
-create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20);
-insert into part_a_1_a_10 values ('a', 1);
-insert into part_b_10_b_20 values ('b', 10);
--- fail
-update part_a_1_a_10 set a = 'b' where a = 'a';
-ERROR:  new row for relation "part_a_1_a_10" violates partition constraint
-DETAIL:  Failing row contains (b, 1).
-update range_parted set b = b - 1 where b = 10;
-ERROR:  new row for relation "part_b_10_b_20" violates partition constraint
-DETAIL:  Failing row contains (b, 9).
+   b bigint,
+   c numeric,
+   d int,
+   e varchar
+) PARTITION BY RANGE (a, b);
+-- Create partitions intentionally in descending bound order, so as to test
+-- that update-row-movement works with the leaf partitions not in bound order.
+CREATE TABLE part_b_20_b_30 (e varchar, c numeric, a text, b bigint, d int);
+ALTER TABLE range_parted ATTACH PARTITION part_b_20_b_30 FOR VALUES FROM ('b', 20) TO ('b', 30);
+CREATE TABLE part_b_10_b_20 (e varchar, c numeric, a text, b bigint, d int) PARTITION BY RANGE (c);
+CREATE TABLE part_b_1_b_10 PARTITION OF range_parted FOR VALUES FROM ('b', 1) TO ('b', 10);
+ALTER TABLE range_parted ATTACH PARTITION part_b_10_b_20 FOR VALUES FROM ('b', 10) TO ('b', 20);
+CREATE TABLE part_a_10_a_20 PARTITION OF range_parted FOR VALUES FROM ('a', 10) TO ('a', 20);
+CREATE TABLE part_a_1_a_10 PARTITION OF range_parted FOR VALUES FROM ('a', 1) TO ('a', 10);
+-- Check that partition-key UPDATE works sanely on a partitioned table that
+-- does not have any child partitions.
+UPDATE part_b_10_b_20 set b = b - 6;
+-- Create some more partitions following the above pattern of descending bound
+-- order, but let's make the situation a bit more complex by having the
+-- attribute numbers of the columns vary from their parent partition.
+CREATE TABLE part_c_100_200 (e varchar, c numeric, a text, b bigint, d int) PARTITION BY range (abs(d));
+ALTER TABLE part_c_100_200 DROP COLUMN e, DROP COLUMN c, DROP COLUMN a;
+ALTER TABLE part_c_100_200 ADD COLUMN c numeric, ADD COLUMN e varchar, ADD COLUMN a text;
+ALTER TABLE part_c_100_200 DROP COLUMN b;
+ALTER TABLE part_c_100_200 ADD COLUMN b bigint;
+CREATE TABLE part_d_1_15 PARTITION OF part_c_100_200 FOR VALUES FROM (1) TO (15);
+CREATE TABLE part_d_15_20 PARTITION OF part_c_100_200 FOR VALUES FROM (15) TO (20);
+ALTER TABLE part_b_10_b_20 ATTACH PARTITION part_c_100_200 FOR VALUES FROM (100) TO (200);
+CREATE TABLE part_c_1_100 (e varchar, d int, c numeric, b bigint, a text);
+ALTER TABLE part_b_10_b_20 ATTACH PARTITION part_c_1_100 FOR VALUES FROM (1) TO (100);
+\set init_range_parted 'truncate range_parted; insert into range_parted VALUES (''a'', 1, 1, 1), (''a'', 10, 200, 1), (''b'', 12, 96, 1), (''b'', 13, 97, 2), (''b'', 15, 105, 16), (''b'', 17, 105, 19)'
+\set show_data 'select tableoid::regclass::text COLLATE "C" partname, * from range_parted ORDER BY 1, 2, 3, 4, 5, 6'
+:init_range_parted;
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 12 |  96 |  1 | 
+ part_c_1_100   | b | 13 |  97 |  2 | 
+ part_d_15_20   | b | 15 | 105 | 16 | 
+ part_d_15_20   | b | 17 | 105 | 19 | 
+(6 rows)
+
+-- The order of subplans should be in bound order
+EXPLAIN (costs off) UPDATE range_parted set c = c - 50 WHERE c > 97;
+             QUERY PLAN              
+-------------------------------------
+ Update on range_parted
+   Update on part_a_1_a_10
+   Update on part_a_10_a_20
+   Update on part_b_1_b_10
+   Update on part_c_1_100
+   Update on part_d_1_15
+   Update on part_d_15_20
+   Update on part_b_20_b_30
+   ->  Seq Scan on part_a_1_a_10
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_a_10_a_20
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_b_1_b_10
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_c_1_100
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_d_1_15
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_d_15_20
+         Filter: (c > '97'::numeric)
+   ->  Seq Scan on part_b_20_b_30
+         Filter: (c > '97'::numeric)
+(22 rows)
+
+-- fail, row movement happens only within the partition subtree.
+UPDATE part_c_100_200 set c = c - 20, d = c WHERE c = 105;
+ERROR:  new row for relation "part_c_100_200" violates partition constraint
+DETAIL:  Failing row contains (105, 85, null, b, 15).
+-- fail, no partition key update, so no attempt to move tuple,
+-- but "a = 'a'" violates partition constraint enforced by root partition)
+UPDATE part_b_10_b_20 set a = 'a';
+ERROR:  new row for relation "part_c_1_100" violates partition constraint
+DETAIL:  Failing row contains (null, 1, 96, 12, a).
+-- ok, partition key update, no constraint violation
+UPDATE range_parted set d = d - 10 WHERE d > 10;
+-- ok, no partition key update, no constraint violation
+UPDATE range_parted set e = d;
+-- No row found
+UPDATE part_c_1_100 set c = c + 20 WHERE c = 98;
+-- ok, row movement
+UPDATE part_b_10_b_20 set c = c + 20 returning c, b, a;
+  c  | b  | a 
+-----+----+---
+ 116 | 12 | b
+ 117 | 13 | b
+ 125 | 15 | b
+ 125 | 17 | b
+(4 rows)
+
+:show_data;
+    partname    | a | b  |  c  | d | e 
+----------------+---+----+-----+---+---
+ part_a_10_a_20 | a | 10 | 200 | 1 | 1
+ part_a_1_a_10  | a |  1 |   1 | 1 | 1
+ part_d_1_15    | b | 12 | 116 | 1 | 1
+ part_d_1_15    | b | 13 | 117 | 2 | 2
+ part_d_1_15    | b | 15 | 125 | 6 | 6
+ part_d_1_15    | b | 17 | 125 | 9 | 9
+(6 rows)
+
+-- fail, row movement happens only within the partition subtree.
+UPDATE part_b_10_b_20 set b = b - 6 WHERE c > 116 returning *;
+ERROR:  new row for relation "part_d_1_15" violates partition constraint
+DETAIL:  Failing row contains (2, 117, 2, b, 7).
+-- ok, row movement, with subset of rows moved into different partition.
+UPDATE range_parted set b = b - 6 WHERE c > 116 returning a, b + c;
+ a | ?column? 
+---+----------
+ a |      204
+ b |      124
+ b |      134
+ b |      136
+(4 rows)
+
+:show_data;
+   partname    | a | b  |  c  | d | e 
+---------------+---+----+-----+---+---
+ part_a_1_a_10 | a |  1 |   1 | 1 | 1
+ part_a_1_a_10 | a |  4 | 200 | 1 | 1
+ part_b_1_b_10 | b |  7 | 117 | 2 | 2
+ part_b_1_b_10 | b |  9 | 125 | 6 | 6
+ part_d_1_15   | b | 11 | 125 | 9 | 9
+ part_d_1_15   | b | 12 | 116 | 1 | 1
+(6 rows)
+
+-- Common table needed for multiple test scenarios.
+CREATE TABLE mintab(c1 int);
+INSERT into mintab VALUES (120);
+-- update partition key using updatable view.
+CREATE VIEW upview AS SELECT * FROM range_parted WHERE (select c > c1 FROM mintab) WITH CHECK OPTION;
+-- ok
+UPDATE upview set c = 199 WHERE b = 4;
+-- fail, check option violation
+UPDATE upview set c = 120 WHERE b = 4;
+ERROR:  new row violates check option for view "upview"
+DETAIL:  Failing row contains (a, 4, 120, 1, 1).
+-- fail, row movement with check option violation
+UPDATE upview set a = 'b', b = 15, c = 120 WHERE b = 4;
+ERROR:  new row violates check option for view "upview"
+DETAIL:  Failing row contains (b, 15, 120, 1, 1).
+-- ok, row movement, check option passes
+UPDATE upview set a = 'b', b = 15 WHERE b = 4;
+:show_data;
+   partname    | a | b  |  c  | d | e 
+---------------+---+----+-----+---+---
+ part_a_1_a_10 | a |  1 |   1 | 1 | 1
+ part_b_1_b_10 | b |  7 | 117 | 2 | 2
+ part_b_1_b_10 | b |  9 | 125 | 6 | 6
+ part_d_1_15   | b | 11 | 125 | 9 | 9
+ part_d_1_15   | b | 12 | 116 | 1 | 1
+ part_d_1_15   | b | 15 | 199 | 1 | 1
+(6 rows)
+
+-- cleanup
+DROP VIEW upview;
+-- RETURNING having whole-row vars.
+:init_range_parted;
+UPDATE range_parted set c = 95 WHERE a = 'b' and b > 10 and c > 100 returning (range_parted), *;
+ range_parted  | a | b  | c  | d  | e 
+---------------+---+----+----+----+---
+ (b,15,95,16,) | b | 15 | 95 | 16 | 
+ (b,17,95,19,) | b | 17 | 95 | 19 | 
+(2 rows)
+
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 12 |  96 |  1 | 
+ part_c_1_100   | b | 13 |  97 |  2 | 
+ part_c_1_100   | b | 15 |  95 | 16 | 
+ part_c_1_100   | b | 17 |  95 | 19 | 
+(6 rows)
+
+-- Transition tables with update row movement
+:init_range_parted;
+CREATE FUNCTION trans_updatetrigfunc() RETURNS trigger LANGUAGE plpgsql AS
+$$
+  begin
+    raise notice 'trigger = %, old table = %, new table = %',
+                 TG_NAME,
+                 (select string_agg(old_table::text, ', ' ORDER BY a) FROM old_table),
+                 (select string_agg(new_table::text, ', ' ORDER BY a) FROM new_table);
+    return null;
+  end;
+$$;
+CREATE TRIGGER trans_updatetrig
+  AFTER UPDATE ON range_parted REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+UPDATE range_parted set c = (case when c = 96 then 110 else c + 1 end ) WHERE a = 'b' and b > 10 and c >= 96;
+NOTICE:  trigger = trans_updatetrig, old table = (b,12,96,1,), (b,13,97,2,), (b,15,105,16,), (b,17,105,19,), new table = (b,12,110,1,), (b,13,98,2,), (b,15,106,16,), (b,17,106,19,)
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 13 |  98 |  2 | 
+ part_d_15_20   | b | 15 | 106 | 16 | 
+ part_d_15_20   | b | 17 | 106 | 19 | 
+ part_d_1_15    | b | 12 | 110 |  1 | 
+(6 rows)
+
+:init_range_parted;
+-- Enabling OLD TABLE capture for both DELETE as well as UPDATE stmt triggers
+-- should not cause DELETEd rows to be captured twice. Similar thing for
+-- INSERT triggers and inserted rows.
+CREATE TRIGGER trans_deletetrig
+  AFTER DELETE ON range_parted REFERENCING OLD TABLE AS old_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+CREATE TRIGGER trans_inserttrig
+  AFTER INSERT ON range_parted REFERENCING NEW TABLE AS new_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+UPDATE range_parted set c = c + 50 WHERE a = 'b' and b > 10 and c >= 96;
+NOTICE:  trigger = trans_updatetrig, old table = (b,12,96,1,), (b,13,97,2,), (b,15,105,16,), (b,17,105,19,), new table = (b,12,146,1,), (b,13,147,2,), (b,15,155,16,), (b,17,155,19,)
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_d_15_20   | b | 15 | 155 | 16 | 
+ part_d_15_20   | b | 17 | 155 | 19 | 
+ part_d_1_15    | b | 12 | 146 |  1 | 
+ part_d_1_15    | b | 13 | 147 |  2 | 
+(6 rows)
+
+DROP TRIGGER trans_deletetrig ON range_parted;
+DROP TRIGGER trans_inserttrig ON range_parted;
+-- Don't drop trans_updatetrig yet. It is required below.
+-- Test with transition tuple conversion happening for rows moved into the
+-- new partition. This requires a trigger that references transition table
+-- (we already have trans_updatetrig). For inserted rows, the conversion
+-- is not usually needed, because the original tuple is already compatible with
+-- the desired transition tuple format. But conversion happens when there is a
+-- BR trigger because the trigger can change the inserted row. So install a
+-- BR triggers on those child partitions where the rows will be moved.
+CREATE FUNCTION func_parted_mod_b() RETURNS trigger AS $$
+BEGIN
+   NEW.b = NEW.b + 1;
+   return NEW;
+END $$ language plpgsql;
+CREATE TRIGGER trig_c1_100 BEFORE UPDATE OR INSERT ON part_c_1_100
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+CREATE TRIGGER trig_d1_15 BEFORE UPDATE OR INSERT ON part_d_1_15
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+CREATE TRIGGER trig_d15_20 BEFORE UPDATE OR INSERT ON part_d_15_20
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+:init_range_parted;
+UPDATE range_parted set c = (case when c = 96 then 110 else c + 1 end) WHERE a = 'b' and b > 10 and c >= 96;
+NOTICE:  trigger = trans_updatetrig, old table = (b,13,96,1,), (b,14,97,2,), (b,16,105,16,), (b,18,105,19,), new table = (b,15,110,1,), (b,15,98,2,), (b,17,106,16,), (b,19,106,19,)
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 15 |  98 |  2 | 
+ part_d_15_20   | b | 17 | 106 | 16 | 
+ part_d_15_20   | b | 19 | 106 | 19 | 
+ part_d_1_15    | b | 15 | 110 |  1 | 
+(6 rows)
+
+:init_range_parted;
+UPDATE range_parted set c = c + 50 WHERE a = 'b' and b > 10 and c >= 96;
+NOTICE:  trigger = trans_updatetrig, old table = (b,13,96,1,), (b,14,97,2,), (b,16,105,16,), (b,18,105,19,), new table = (b,15,146,1,), (b,16,147,2,), (b,17,155,16,), (b,19,155,19,)
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_d_15_20   | b | 17 | 155 | 16 | 
+ part_d_15_20   | b | 19 | 155 | 19 | 
+ part_d_1_15    | b | 15 | 146 |  1 | 
+ part_d_1_15    | b | 16 | 147 |  2 | 
+(6 rows)
+
+-- Case where per-partition tuple conversion map array is allocated, but the
+-- map is not required for the particular tuple that is routed, thanks to
+-- matching table attributes of the partition and the target table.
+:init_range_parted;
+UPDATE range_parted set b = 15 WHERE b = 1;
+NOTICE:  trigger = trans_updatetrig, old table = (a,1,1,1,), new table = (a,15,1,1,)
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_10_a_20 | a | 15 |   1 |  1 | 
+ part_c_1_100   | b | 13 |  96 |  1 | 
+ part_c_1_100   | b | 14 |  97 |  2 | 
+ part_d_15_20   | b | 16 | 105 | 16 | 
+ part_d_15_20   | b | 18 | 105 | 19 | 
+(6 rows)
+
+DROP TRIGGER trans_updatetrig ON range_parted;
+DROP TRIGGER trig_c1_100 ON part_c_1_100;
+DROP TRIGGER trig_d1_15 ON part_d_1_15;
+DROP TRIGGER trig_d15_20 ON part_d_15_20;
+DROP FUNCTION func_parted_mod_b();
+-- RLS policies with update-row-movement
+-----------------------------------------
+ALTER TABLE range_parted ENABLE ROW LEVEL SECURITY;
+CREATE USER regress_range_parted_user;
+GRANT ALL ON range_parted, mintab TO regress_range_parted_user;
+CREATE POLICY seeall ON range_parted AS PERMISSIVE FOR SELECT USING (true);
+CREATE POLICY policy_range_parted ON range_parted for UPDATE USING (true) WITH CHECK (c % 2 = 0);
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- This should fail with RLS violation error while moving row from
+-- part_a_10_a_20 to part_d_1_15, because we are setting 'c' to an odd number.
+UPDATE range_parted set a = 'b', c = 151 WHERE a = 'a' and c = 200;
+ERROR:  new row violates row-level security policy for table "range_parted"
+RESET SESSION AUTHORIZATION;
+-- Create a trigger on part_d_1_15
+CREATE FUNCTION func_d_1_15() RETURNS trigger AS $$
+BEGIN
+   NEW.c = NEW.c + 1; -- Make even numbers odd, or vice versa
+   return NEW;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_d_1_15 BEFORE INSERT ON part_d_1_15
+   FOR EACH ROW EXECUTE PROCEDURE func_d_1_15();
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- Here, RLS checks should succeed while moving row from part_a_10_a_20 to
+-- part_d_1_15. Even though the UPDATE is setting 'c' to an odd number, the
+-- trigger at the destination partition again makes it an even number.
+UPDATE range_parted set a = 'b', c = 151 WHERE a = 'a' and c = 200;
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- This should fail with RLS violation error. Even though the UPDATE is setting
+-- 'c' to an even number, the trigger at the destination partition again makes
+-- it an odd number.
+UPDATE range_parted set a = 'b', c = 150 WHERE a = 'a' and c = 200;
+ERROR:  new row violates row-level security policy for table "range_parted"
+-- Cleanup
+RESET SESSION AUTHORIZATION;
+DROP TRIGGER trig_d_1_15 ON part_d_1_15;
+DROP FUNCTION func_d_1_15();
+-- Policy expression contains SubPlan
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+CREATE POLICY policy_range_parted_subplan on range_parted
+    AS RESTRICTIVE for UPDATE USING (true)
+    WITH CHECK ((SELECT range_parted.c <= c1 FROM mintab));
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- fail, mintab has row with c1 = 120
+UPDATE range_parted set a = 'b', c = 122 WHERE a = 'a' and c = 200;
+ERROR:  new row violates row-level security policy "policy_range_parted_subplan" for table "range_parted"
 -- ok
-update range_parted set b = b + 1 where b = 10;
+UPDATE range_parted set a = 'b', c = 120 WHERE a = 'a' and c = 200;
+-- RLS policy expression contains whole row.
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+CREATE POLICY policy_range_parted_wholerow on range_parted AS RESTRICTIVE for UPDATE USING (true)
+   WITH CHECK (range_parted = row('b', 10, 112, 1, NULL)::range_parted);
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- ok, should pass the RLS check
+UPDATE range_parted set a = 'b', c = 112 WHERE a = 'a' and c = 200;
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- fail, the whole row RLS check should fail
+UPDATE range_parted set a = 'b', c = 116 WHERE a = 'a' and c = 200;
+ERROR:  new row violates row-level security policy "policy_range_parted_wholerow" for table "range_parted"
+-- Cleanup
+RESET SESSION AUTHORIZATION;
+DROP POLICY policy_range_parted ON range_parted;
+DROP POLICY policy_range_parted_subplan ON range_parted;
+DROP POLICY policy_range_parted_wholerow ON range_parted;
+REVOKE ALL ON range_parted, mintab FROM regress_range_parted_user;
+DROP USER regress_range_parted_user;
+DROP TABLE mintab;
+-- statement triggers with update row movement
+---------------------------------------------------
+:init_range_parted;
+CREATE FUNCTION trigfunc() returns trigger language plpgsql as
+$$
+  begin
+    raise notice 'trigger = % fired on table % during %',
+                 TG_NAME, TG_TABLE_NAME, TG_OP;
+    return null;
+  end;
+$$;
+-- Triggers on root partition
+CREATE TRIGGER parent_delete_trig
+  AFTER DELETE ON range_parted for each statement execute procedure trigfunc();
+CREATE TRIGGER parent_update_trig
+  AFTER UPDATE ON range_parted for each statement execute procedure trigfunc();
+CREATE TRIGGER parent_insert_trig
+  AFTER INSERT ON range_parted for each statement execute procedure trigfunc();
+-- Triggers on leaf partition part_c_1_100
+CREATE TRIGGER c1_delete_trig
+  AFTER DELETE ON part_c_1_100 for each statement execute procedure trigfunc();
+CREATE TRIGGER c1_update_trig
+  AFTER UPDATE ON part_c_1_100 for each statement execute procedure trigfunc();
+CREATE TRIGGER c1_insert_trig
+  AFTER INSERT ON part_c_1_100 for each statement execute procedure trigfunc();
+-- Triggers on leaf partition part_d_1_15
+CREATE TRIGGER d1_delete_trig
+  AFTER DELETE ON part_d_1_15 for each statement execute procedure trigfunc();
+CREATE TRIGGER d1_update_trig
+  AFTER UPDATE ON part_d_1_15 for each statement execute procedure trigfunc();
+CREATE TRIGGER d1_insert_trig
+  AFTER INSERT ON part_d_1_15 for each statement execute procedure trigfunc();
+-- Triggers on leaf partition part_d_15_20
+CREATE TRIGGER d15_delete_trig
+  AFTER DELETE ON part_d_15_20 for each statement execute procedure trigfunc();
+CREATE TRIGGER d15_update_trig
+  AFTER UPDATE ON part_d_15_20 for each statement execute procedure trigfunc();
+CREATE TRIGGER d15_insert_trig
+  AFTER INSERT ON part_d_15_20 for each statement execute procedure trigfunc();
+-- Move all rows from part_c_100_200 to part_c_1_100. None of the delete or
+-- insert statement triggers should be fired.
+UPDATE range_parted set c = c - 50 WHERE c > 97;
+NOTICE:  trigger = parent_update_trig fired on table range_parted during UPDATE
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 150 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 12 |  96 |  1 | 
+ part_c_1_100   | b | 13 |  97 |  2 | 
+ part_c_1_100   | b | 15 |  55 | 16 | 
+ part_c_1_100   | b | 17 |  55 | 19 | 
+(6 rows)
+
+DROP TRIGGER parent_delete_trig ON range_parted;
+DROP TRIGGER parent_update_trig ON range_parted;
+DROP TRIGGER parent_insert_trig ON range_parted;
+DROP TRIGGER c1_delete_trig ON part_c_1_100;
+DROP TRIGGER c1_update_trig ON part_c_1_100;
+DROP TRIGGER c1_insert_trig ON part_c_1_100;
+DROP TRIGGER d1_delete_trig ON part_d_1_15;
+DROP TRIGGER d1_update_trig ON part_d_1_15;
+DROP TRIGGER d1_insert_trig ON part_d_1_15;
+DROP TRIGGER d15_delete_trig ON part_d_15_20;
+DROP TRIGGER d15_update_trig ON part_d_15_20;
+DROP TRIGGER d15_insert_trig ON part_d_15_20;
 -- Creating default partition for range
+:init_range_parted;
 create table part_def partition of range_parted default;
 \d+ part_def
-                                  Table "public.part_def"
- Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
---------+---------+-----------+----------+---------+----------+--------------+-------------
- a      | text    |           |          |         | extended |              | 
- b      | integer |           |          |         | plain    |              | 
+                                       Table "public.part_def"
+ Column |       Type        | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+-------------------+-----------+----------+---------+----------+--------------+-------------
+ a      | text              |           |          |         | extended |              | 
+ b      | bigint            |           |          |         | plain    |              | 
+ c      | numeric           |           |          |         | main     |              | 
+ d      | integer           |           |          |         | plain    |              | 
+ e      | character varying |           |          |         | extended |              | 
 Partition of: range_parted DEFAULT
-Partition constraint: (NOT ((a IS NOT NULL) AND (b IS NOT NULL) AND (((a = 'a'::text) AND (b >= 1) AND (b < 10)) OR ((a = 'a'::text) AND (b >= 10) AND (b < 20)) OR ((a = 'b'::text) AND (b >= 1) AND (b < 10)) OR ((a = 'b'::text) AND (b >= 10) AND (b < 20)))))
+Partition constraint: (NOT ((a IS NOT NULL) AND (b IS NOT NULL) AND (((a = 'a'::text) AND (b >= '1'::bigint) AND (b < '10'::bigint)) OR ((a = 'a'::text) AND (b >= '10'::bigint) AND (b < '20'::bigint)) OR ((a = 'b'::text) AND (b >= '1'::bigint) AND (b < '10'::bigint)) OR ((a = 'b'::text) AND (b >= '10'::bigint) AND (b < '20'::bigint)) OR ((a = 'b'::text) AND (b >= '20'::bigint) AND (b < '30'::bigint)))))
 
 insert into range_parted values ('c', 9);
 -- ok
 -- fail
 update part_def set a = 'a' where a = 'd';
 ERROR:  new row for relation "part_def" violates partition constraint
-DETAIL:  Failing row contains (a, 9).
-create table list_parted (
+DETAIL:  Failing row contains (a, 9, null, null, null).
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 12 |  96 |  1 | 
+ part_c_1_100   | b | 13 |  97 |  2 | 
+ part_d_15_20   | b | 15 | 105 | 16 | 
+ part_d_15_20   | b | 17 | 105 | 19 | 
+ part_def       | d |  9 |     |    | 
+(7 rows)
+
+-- Update row movement from non-default to default partition.
+-- fail, default partition is not under part_a_10_a_20;
+UPDATE part_a_10_a_20 set a = 'ad' WHERE a = 'a';
+ERROR:  new row for relation "part_a_10_a_20" violates partition constraint
+DETAIL:  Failing row contains (ad, 10, 200, 1, null).
+-- ok
+UPDATE range_parted set a = 'ad' WHERE a = 'a';
+UPDATE range_parted set a = 'bd' WHERE a = 'b';
+:show_data;
+ partname | a  | b  |  c  | d  | e 
+----------+----+----+-----+----+---
+ part_def | ad |  1 |   1 |  1 | 
+ part_def | ad | 10 | 200 |  1 | 
+ part_def | bd | 12 |  96 |  1 | 
+ part_def | bd | 13 |  97 |  2 | 
+ part_def | bd | 15 | 105 | 16 | 
+ part_def | bd | 17 | 105 | 19 | 
+ part_def | d  |  9 |     |    | 
+(7 rows)
+
+-- Update row movement from default to non-default partitions.
+-- ok
+UPDATE range_parted set a = 'a' WHERE a = 'ad';
+UPDATE range_parted set a = 'b' WHERE a = 'bd';
+:show_data;
+    partname    | a | b  |  c  | d  | e 
+----------------+---+----+-----+----+---
+ part_a_10_a_20 | a | 10 | 200 |  1 | 
+ part_a_1_a_10  | a |  1 |   1 |  1 | 
+ part_c_1_100   | b | 12 |  96 |  1 | 
+ part_c_1_100   | b | 13 |  97 |  2 | 
+ part_d_15_20   | b | 15 | 105 | 16 | 
+ part_d_15_20   | b | 17 | 105 | 19 | 
+ part_def       | d |  9 |     |    | 
+(7 rows)
+
+-- Cleanup: range_parted no longer needed.
+DROP TABLE range_parted;
+CREATE TABLE list_parted (
    a text,
    b int
-) partition by list (a);
-create table list_part1  partition of list_parted for values in ('a', 'b');
-create table list_default partition of list_parted default;
-insert into list_part1 values ('a', 1);
-insert into list_default values ('d', 10);
+) PARTITION BY list (a);
+CREATE TABLE list_part1  PARTITION OF list_parted for VALUES in ('a', 'b');
+CREATE TABLE list_default PARTITION OF list_parted default;
+INSERT into list_part1 VALUES ('a', 1);
+INSERT into list_default VALUES ('d', 10);
 -- fail
-update list_default set a = 'a' where a = 'd';
+UPDATE list_default set a = 'a' WHERE a = 'd';
 ERROR:  new row for relation "list_default" violates partition constraint
 DETAIL:  Failing row contains (a, 10).
 -- ok
-update list_default set a = 'x' where a = 'd';
+UPDATE list_default set a = 'x' WHERE a = 'd';
+DROP TABLE list_parted;
+--------------
+-- Some more update-partition-key test scenarios below. This time use list
+-- partitions.
+--------------
+-- Setup for list partitions
+CREATE TABLE list_parted (a numeric, b int, c int8) PARTITION BY list (a);
+CREATE TABLE sub_parted PARTITION OF list_parted for VALUES in (1) PARTITION BY list (b);
+CREATE TABLE sub_part1(b int, c int8, a numeric);
+ALTER TABLE sub_parted ATTACH PARTITION sub_part1 for VALUES in (1);
+CREATE TABLE sub_part2(b int, c int8, a numeric);
+ALTER TABLE sub_parted ATTACH PARTITION sub_part2 for VALUES in (2);
+CREATE TABLE list_part1(a numeric, b int, c int8);
+ALTER TABLE list_parted ATTACH PARTITION list_part1 for VALUES in (2,3);
+INSERT into list_parted VALUES (2,5,50);
+INSERT into list_parted VALUES (3,6,60);
+INSERT into sub_parted VALUES (1,1,60);
+INSERT into sub_parted VALUES (1,2,10);
+-- Test partition constraint violation when intermediate ancestor is used and
+-- constraint is inherited from upper root.
+UPDATE sub_parted set a = 2 WHERE c = 10;
+ERROR:  new row for relation "sub_part2" violates partition constraint
+DETAIL:  Failing row contains (2, 10, 2).
+-- Test update-partition-key, where the unpruned partitions do not have their
+-- partition keys updated.
+SELECT tableoid::regclass::text, * FROM list_parted WHERE a = 2 ORDER BY 1;
+  tableoid  | a | b | c  
+------------+---+---+----
+ list_part1 | 2 | 5 | 50
+(1 row)
+
+UPDATE list_parted set b = c + a WHERE a = 2;
+SELECT tableoid::regclass::text, * FROM list_parted WHERE a = 2 ORDER BY 1;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+(1 row)
+
+-- Test the case where BR UPDATE triggers change the partition key.
+CREATE FUNCTION func_parted_mod_b() returns trigger as $$
+BEGIN
+   NEW.b = 2; -- This is changing partition key column.
+   return NEW;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER parted_mod_b before update on sub_part1
+   for each row execute procedure func_parted_mod_b();
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+ sub_part1  | 1 |  1 | 60
+ sub_part2  | 1 |  2 | 10
+(4 rows)
+
+-- This should do the tuple routing even though there is no explicit
+-- partition-key update, because there is a trigger on sub_part1.
+UPDATE list_parted set c = 70 WHERE b  = 1;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+ sub_part2  | 1 |  2 | 10
+ sub_part2  | 1 |  2 | 70
+(4 rows)
+
+DROP TRIGGER parted_mod_b ON sub_part1;
+-- If BR DELETE trigger prevented DELETE from happening, we should also skip
+-- the INSERT if that delete is part of UPDATE=>DELETE+INSERT.
+CREATE OR REPLACE FUNCTION func_parted_mod_b() returns trigger as $$
+BEGIN
+   raise notice 'Trigger: Got OLD row %, but returning NULL', OLD;
+   return NULL;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_skip_delete before delete on sub_part2
+   for each row execute procedure func_parted_mod_b();
+UPDATE list_parted set b = 1 WHERE c = 70;
+NOTICE:  Trigger: Got OLD row (2,70,1), but returning NULL
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+ sub_part2  | 1 |  2 | 10
+ sub_part2  | 1 |  2 | 70
+(4 rows)
+
+-- Drop the trigger. Now the row should be moved.
+DROP TRIGGER trig_skip_delete ON sub_part2;
+UPDATE list_parted set b = 1 WHERE c = 70;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+ sub_part1  | 1 |  1 | 70
+ sub_part2  | 1 |  2 | 10
+(4 rows)
+
+DROP FUNCTION func_parted_mod_b();
+-- UPDATE partition-key with FROM clause. If join produces multiple output
+-- rows for the same row to be modified, we should tuple-route the row only
+-- once. There should not be any rows inserted.
+CREATE TABLE non_parted (id int);
+INSERT into non_parted VALUES (1), (1), (1), (2), (2), (2), (3), (3), (3);
+UPDATE list_parted t1 set a = 2 FROM non_parted t2 WHERE t1.a = t2.id and a = 1;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 |  1 | 70
+ list_part1 | 2 |  2 | 10
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+(4 rows)
+
+DROP TABLE non_parted;
+-- Cleanup: list_parted no longer needed.
+DROP TABLE list_parted;
 -- create custom operator class and hash function, for the same reason
 -- explained in alter_table.sql
 create or replace function dummy_hashint4(a int4, seed int8) returns int8 as
 update hpart1 set a = 3, b=4 where a = 1;
 ERROR:  new row for relation "hpart1" violates partition constraint
 DETAIL:  Failing row contains (3, 4).
+-- ok, row movement
 update hash_parted set b = b - 1 where b = 1;
-ERROR:  new row for relation "hpart1" violates partition constraint
-DETAIL:  Failing row contains (1, 0).
 -- ok
 update hash_parted set b = b + 8 where b = 1;
 -- cleanup
-drop table range_parted;
-drop table list_parted;
 drop table hash_parted;
 drop operator class custom_opclass using hash;
 drop function dummy_hashint4(a int4, seed int8);
 
 DROP TABLE update_test;
 DROP TABLE upsert_test;
 
--- update to a partition should check partition bound constraint for the new tuple
-create table range_parted (
+
+---------------------------
+-- UPDATE with row movement
+---------------------------
+
+-- When a partitioned table receives an UPDATE to the partitioned key and the
+-- new values no longer meet the partition's bound, the row must be moved to
+-- the correct partition for the new partition key (if one exists). We must
+-- also ensure that updatable views on partitioned tables properly enforce any
+-- WITH CHECK OPTION that is defined. The situation with triggers in this case
+-- also requires thorough testing as partition key updates causing row
+-- movement convert UPDATEs into DELETE+INSERT.
+
+CREATE TABLE range_parted (
    a text,
-   b int
-) partition by range (a, b);
-create table part_a_1_a_10 partition of range_parted for values from ('a', 1) to ('a', 10);
-create table part_a_10_a_20 partition of range_parted for values from ('a', 10) to ('a', 20);
-create table part_b_1_b_10 partition of range_parted for values from ('b', 1) to ('b', 10);
-create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20);
-insert into part_a_1_a_10 values ('a', 1);
-insert into part_b_10_b_20 values ('b', 10);
+   b bigint,
+   c numeric,
+   d int,
+   e varchar
+) PARTITION BY RANGE (a, b);
 
--- fail
-update part_a_1_a_10 set a = 'b' where a = 'a';
-update range_parted set b = b - 1 where b = 10;
+-- Create partitions intentionally in descending bound order, so as to test
+-- that update-row-movement works with the leaf partitions not in bound order.
+CREATE TABLE part_b_20_b_30 (e varchar, c numeric, a text, b bigint, d int);
+ALTER TABLE range_parted ATTACH PARTITION part_b_20_b_30 FOR VALUES FROM ('b', 20) TO ('b', 30);
+CREATE TABLE part_b_10_b_20 (e varchar, c numeric, a text, b bigint, d int) PARTITION BY RANGE (c);
+CREATE TABLE part_b_1_b_10 PARTITION OF range_parted FOR VALUES FROM ('b', 1) TO ('b', 10);
+ALTER TABLE range_parted ATTACH PARTITION part_b_10_b_20 FOR VALUES FROM ('b', 10) TO ('b', 20);
+CREATE TABLE part_a_10_a_20 PARTITION OF range_parted FOR VALUES FROM ('a', 10) TO ('a', 20);
+CREATE TABLE part_a_1_a_10 PARTITION OF range_parted FOR VALUES FROM ('a', 1) TO ('a', 10);
+
+-- Check that partition-key UPDATE works sanely on a partitioned table that
+-- does not have any child partitions.
+UPDATE part_b_10_b_20 set b = b - 6;
+
+-- Create some more partitions following the above pattern of descending bound
+-- order, but let's make the situation a bit more complex by having the
+-- attribute numbers of the columns vary from their parent partition.
+CREATE TABLE part_c_100_200 (e varchar, c numeric, a text, b bigint, d int) PARTITION BY range (abs(d));
+ALTER TABLE part_c_100_200 DROP COLUMN e, DROP COLUMN c, DROP COLUMN a;
+ALTER TABLE part_c_100_200 ADD COLUMN c numeric, ADD COLUMN e varchar, ADD COLUMN a text;
+ALTER TABLE part_c_100_200 DROP COLUMN b;
+ALTER TABLE part_c_100_200 ADD COLUMN b bigint;
+CREATE TABLE part_d_1_15 PARTITION OF part_c_100_200 FOR VALUES FROM (1) TO (15);
+CREATE TABLE part_d_15_20 PARTITION OF part_c_100_200 FOR VALUES FROM (15) TO (20);
+
+ALTER TABLE part_b_10_b_20 ATTACH PARTITION part_c_100_200 FOR VALUES FROM (100) TO (200);
+
+CREATE TABLE part_c_1_100 (e varchar, d int, c numeric, b bigint, a text);
+ALTER TABLE part_b_10_b_20 ATTACH PARTITION part_c_1_100 FOR VALUES FROM (1) TO (100);
+
+\set init_range_parted 'truncate range_parted; insert into range_parted VALUES (''a'', 1, 1, 1), (''a'', 10, 200, 1), (''b'', 12, 96, 1), (''b'', 13, 97, 2), (''b'', 15, 105, 16), (''b'', 17, 105, 19)'
+\set show_data 'select tableoid::regclass::text COLLATE "C" partname, * from range_parted ORDER BY 1, 2, 3, 4, 5, 6'
+:init_range_parted;
+:show_data;
+
+-- The order of subplans should be in bound order
+EXPLAIN (costs off) UPDATE range_parted set c = c - 50 WHERE c > 97;
+
+-- fail, row movement happens only within the partition subtree.
+UPDATE part_c_100_200 set c = c - 20, d = c WHERE c = 105;
+-- fail, no partition key update, so no attempt to move tuple,
+-- but "a = 'a'" violates partition constraint enforced by root partition)
+UPDATE part_b_10_b_20 set a = 'a';
+-- ok, partition key update, no constraint violation
+UPDATE range_parted set d = d - 10 WHERE d > 10;
+-- ok, no partition key update, no constraint violation
+UPDATE range_parted set e = d;
+-- No row found
+UPDATE part_c_1_100 set c = c + 20 WHERE c = 98;
+-- ok, row movement
+UPDATE part_b_10_b_20 set c = c + 20 returning c, b, a;
+:show_data;
+
+-- fail, row movement happens only within the partition subtree.
+UPDATE part_b_10_b_20 set b = b - 6 WHERE c > 116 returning *;
+-- ok, row movement, with subset of rows moved into different partition.
+UPDATE range_parted set b = b - 6 WHERE c > 116 returning a, b + c;
+
+:show_data;
+
+-- Common table needed for multiple test scenarios.
+CREATE TABLE mintab(c1 int);
+INSERT into mintab VALUES (120);
+
+-- update partition key using updatable view.
+CREATE VIEW upview AS SELECT * FROM range_parted WHERE (select c > c1 FROM mintab) WITH CHECK OPTION;
+-- ok
+UPDATE upview set c = 199 WHERE b = 4;
+-- fail, check option violation
+UPDATE upview set c = 120 WHERE b = 4;
+-- fail, row movement with check option violation
+UPDATE upview set a = 'b', b = 15, c = 120 WHERE b = 4;
+-- ok, row movement, check option passes
+UPDATE upview set a = 'b', b = 15 WHERE b = 4;
+
+:show_data;
+
+-- cleanup
+DROP VIEW upview;
+
+-- RETURNING having whole-row vars.
+:init_range_parted;
+UPDATE range_parted set c = 95 WHERE a = 'b' and b > 10 and c > 100 returning (range_parted), *;
+:show_data;
+
+
+-- Transition tables with update row movement
+:init_range_parted;
+
+CREATE FUNCTION trans_updatetrigfunc() RETURNS trigger LANGUAGE plpgsql AS
+$$
+  begin
+    raise notice 'trigger = %, old table = %, new table = %',
+                 TG_NAME,
+                 (select string_agg(old_table::text, ', ' ORDER BY a) FROM old_table),
+                 (select string_agg(new_table::text, ', ' ORDER BY a) FROM new_table);
+    return null;
+  end;
+$$;
+
+CREATE TRIGGER trans_updatetrig
+  AFTER UPDATE ON range_parted REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+
+UPDATE range_parted set c = (case when c = 96 then 110 else c + 1 end ) WHERE a = 'b' and b > 10 and c >= 96;
+:show_data;
+:init_range_parted;
+
+-- Enabling OLD TABLE capture for both DELETE as well as UPDATE stmt triggers
+-- should not cause DELETEd rows to be captured twice. Similar thing for
+-- INSERT triggers and inserted rows.
+CREATE TRIGGER trans_deletetrig
+  AFTER DELETE ON range_parted REFERENCING OLD TABLE AS old_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+CREATE TRIGGER trans_inserttrig
+  AFTER INSERT ON range_parted REFERENCING NEW TABLE AS new_table
+  FOR EACH STATEMENT EXECUTE PROCEDURE trans_updatetrigfunc();
+UPDATE range_parted set c = c + 50 WHERE a = 'b' and b > 10 and c >= 96;
+:show_data;
+DROP TRIGGER trans_deletetrig ON range_parted;
+DROP TRIGGER trans_inserttrig ON range_parted;
+-- Don't drop trans_updatetrig yet. It is required below.
+
+-- Test with transition tuple conversion happening for rows moved into the
+-- new partition. This requires a trigger that references transition table
+-- (we already have trans_updatetrig). For inserted rows, the conversion
+-- is not usually needed, because the original tuple is already compatible with
+-- the desired transition tuple format. But conversion happens when there is a
+-- BR trigger because the trigger can change the inserted row. So install a
+-- BR triggers on those child partitions where the rows will be moved.
+CREATE FUNCTION func_parted_mod_b() RETURNS trigger AS $$
+BEGIN
+   NEW.b = NEW.b + 1;
+   return NEW;
+END $$ language plpgsql;
+CREATE TRIGGER trig_c1_100 BEFORE UPDATE OR INSERT ON part_c_1_100
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+CREATE TRIGGER trig_d1_15 BEFORE UPDATE OR INSERT ON part_d_1_15
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+CREATE TRIGGER trig_d15_20 BEFORE UPDATE OR INSERT ON part_d_15_20
+   FOR EACH ROW EXECUTE PROCEDURE func_parted_mod_b();
+:init_range_parted;
+UPDATE range_parted set c = (case when c = 96 then 110 else c + 1 end) WHERE a = 'b' and b > 10 and c >= 96;
+:show_data;
+:init_range_parted;
+UPDATE range_parted set c = c + 50 WHERE a = 'b' and b > 10 and c >= 96;
+:show_data;
+
+-- Case where per-partition tuple conversion map array is allocated, but the
+-- map is not required for the particular tuple that is routed, thanks to
+-- matching table attributes of the partition and the target table.
+:init_range_parted;
+UPDATE range_parted set b = 15 WHERE b = 1;
+:show_data;
+
+DROP TRIGGER trans_updatetrig ON range_parted;
+DROP TRIGGER trig_c1_100 ON part_c_1_100;
+DROP TRIGGER trig_d1_15 ON part_d_1_15;
+DROP TRIGGER trig_d15_20 ON part_d_15_20;
+DROP FUNCTION func_parted_mod_b();
+
+-- RLS policies with update-row-movement
+-----------------------------------------
+
+ALTER TABLE range_parted ENABLE ROW LEVEL SECURITY;
+CREATE USER regress_range_parted_user;
+GRANT ALL ON range_parted, mintab TO regress_range_parted_user;
+CREATE POLICY seeall ON range_parted AS PERMISSIVE FOR SELECT USING (true);
+CREATE POLICY policy_range_parted ON range_parted for UPDATE USING (true) WITH CHECK (c % 2 = 0);
+
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- This should fail with RLS violation error while moving row from
+-- part_a_10_a_20 to part_d_1_15, because we are setting 'c' to an odd number.
+UPDATE range_parted set a = 'b', c = 151 WHERE a = 'a' and c = 200;
+
+RESET SESSION AUTHORIZATION;
+-- Create a trigger on part_d_1_15
+CREATE FUNCTION func_d_1_15() RETURNS trigger AS $$
+BEGIN
+   NEW.c = NEW.c + 1; -- Make even numbers odd, or vice versa
+   return NEW;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_d_1_15 BEFORE INSERT ON part_d_1_15
+   FOR EACH ROW EXECUTE PROCEDURE func_d_1_15();
+
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+
+-- Here, RLS checks should succeed while moving row from part_a_10_a_20 to
+-- part_d_1_15. Even though the UPDATE is setting 'c' to an odd number, the
+-- trigger at the destination partition again makes it an even number.
+UPDATE range_parted set a = 'b', c = 151 WHERE a = 'a' and c = 200;
+
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- This should fail with RLS violation error. Even though the UPDATE is setting
+-- 'c' to an even number, the trigger at the destination partition again makes
+-- it an odd number.
+UPDATE range_parted set a = 'b', c = 150 WHERE a = 'a' and c = 200;
+
+-- Cleanup
+RESET SESSION AUTHORIZATION;
+DROP TRIGGER trig_d_1_15 ON part_d_1_15;
+DROP FUNCTION func_d_1_15();
+
+-- Policy expression contains SubPlan
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+CREATE POLICY policy_range_parted_subplan on range_parted
+    AS RESTRICTIVE for UPDATE USING (true)
+    WITH CHECK ((SELECT range_parted.c <= c1 FROM mintab));
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- fail, mintab has row with c1 = 120
+UPDATE range_parted set a = 'b', c = 122 WHERE a = 'a' and c = 200;
 -- ok
-update range_parted set b = b + 1 where b = 10;
+UPDATE range_parted set a = 'b', c = 120 WHERE a = 'a' and c = 200;
+
+-- RLS policy expression contains whole row.
+
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+CREATE POLICY policy_range_parted_wholerow on range_parted AS RESTRICTIVE for UPDATE USING (true)
+   WITH CHECK (range_parted = row('b', 10, 112, 1, NULL)::range_parted);
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- ok, should pass the RLS check
+UPDATE range_parted set a = 'b', c = 112 WHERE a = 'a' and c = 200;
+RESET SESSION AUTHORIZATION;
+:init_range_parted;
+SET SESSION AUTHORIZATION regress_range_parted_user;
+-- fail, the whole row RLS check should fail
+UPDATE range_parted set a = 'b', c = 116 WHERE a = 'a' and c = 200;
+
+-- Cleanup
+RESET SESSION AUTHORIZATION;
+DROP POLICY policy_range_parted ON range_parted;
+DROP POLICY policy_range_parted_subplan ON range_parted;
+DROP POLICY policy_range_parted_wholerow ON range_parted;
+REVOKE ALL ON range_parted, mintab FROM regress_range_parted_user;
+DROP USER regress_range_parted_user;
+DROP TABLE mintab;
+
+
+-- statement triggers with update row movement
+---------------------------------------------------
+
+:init_range_parted;
+
+CREATE FUNCTION trigfunc() returns trigger language plpgsql as
+$$
+  begin
+    raise notice 'trigger = % fired on table % during %',
+                 TG_NAME, TG_TABLE_NAME, TG_OP;
+    return null;
+  end;
+$$;
+-- Triggers on root partition
+CREATE TRIGGER parent_delete_trig
+  AFTER DELETE ON range_parted for each statement execute procedure trigfunc();
+CREATE TRIGGER parent_update_trig
+  AFTER UPDATE ON range_parted for each statement execute procedure trigfunc();
+CREATE TRIGGER parent_insert_trig
+  AFTER INSERT ON range_parted for each statement execute procedure trigfunc();
+
+-- Triggers on leaf partition part_c_1_100
+CREATE TRIGGER c1_delete_trig
+  AFTER DELETE ON part_c_1_100 for each statement execute procedure trigfunc();
+CREATE TRIGGER c1_update_trig
+  AFTER UPDATE ON part_c_1_100 for each statement execute procedure trigfunc();
+CREATE TRIGGER c1_insert_trig
+  AFTER INSERT ON part_c_1_100 for each statement execute procedure trigfunc();
+
+-- Triggers on leaf partition part_d_1_15
+CREATE TRIGGER d1_delete_trig
+  AFTER DELETE ON part_d_1_15 for each statement execute procedure trigfunc();
+CREATE TRIGGER d1_update_trig
+  AFTER UPDATE ON part_d_1_15 for each statement execute procedure trigfunc();
+CREATE TRIGGER d1_insert_trig
+  AFTER INSERT ON part_d_1_15 for each statement execute procedure trigfunc();
+-- Triggers on leaf partition part_d_15_20
+CREATE TRIGGER d15_delete_trig
+  AFTER DELETE ON part_d_15_20 for each statement execute procedure trigfunc();
+CREATE TRIGGER d15_update_trig
+  AFTER UPDATE ON part_d_15_20 for each statement execute procedure trigfunc();
+CREATE TRIGGER d15_insert_trig
+  AFTER INSERT ON part_d_15_20 for each statement execute procedure trigfunc();
+
+-- Move all rows from part_c_100_200 to part_c_1_100. None of the delete or
+-- insert statement triggers should be fired.
+UPDATE range_parted set c = c - 50 WHERE c > 97;
+:show_data;
+
+DROP TRIGGER parent_delete_trig ON range_parted;
+DROP TRIGGER parent_update_trig ON range_parted;
+DROP TRIGGER parent_insert_trig ON range_parted;
+DROP TRIGGER c1_delete_trig ON part_c_1_100;
+DROP TRIGGER c1_update_trig ON part_c_1_100;
+DROP TRIGGER c1_insert_trig ON part_c_1_100;
+DROP TRIGGER d1_delete_trig ON part_d_1_15;
+DROP TRIGGER d1_update_trig ON part_d_1_15;
+DROP TRIGGER d1_insert_trig ON part_d_1_15;
+DROP TRIGGER d15_delete_trig ON part_d_15_20;
+DROP TRIGGER d15_update_trig ON part_d_15_20;
+DROP TRIGGER d15_insert_trig ON part_d_15_20;
+
 
 -- Creating default partition for range
+:init_range_parted;
 create table part_def partition of range_parted default;
 \d+ part_def
 insert into range_parted values ('c', 9);
 -- fail
 update part_def set a = 'a' where a = 'd';
 
-create table list_parted (
+:show_data;
+
+-- Update row movement from non-default to default partition.
+-- fail, default partition is not under part_a_10_a_20;
+UPDATE part_a_10_a_20 set a = 'ad' WHERE a = 'a';
+-- ok
+UPDATE range_parted set a = 'ad' WHERE a = 'a';
+UPDATE range_parted set a = 'bd' WHERE a = 'b';
+:show_data;
+-- Update row movement from default to non-default partitions.
+-- ok
+UPDATE range_parted set a = 'a' WHERE a = 'ad';
+UPDATE range_parted set a = 'b' WHERE a = 'bd';
+:show_data;
+
+-- Cleanup: range_parted no longer needed.
+DROP TABLE range_parted;
+
+CREATE TABLE list_parted (
    a text,
    b int
-) partition by list (a);
-create table list_part1  partition of list_parted for values in ('a', 'b');
-create table list_default partition of list_parted default;
-insert into list_part1 values ('a', 1);
-insert into list_default values ('d', 10);
+) PARTITION BY list (a);
+CREATE TABLE list_part1  PARTITION OF list_parted for VALUES in ('a', 'b');
+CREATE TABLE list_default PARTITION OF list_parted default;
+INSERT into list_part1 VALUES ('a', 1);
+INSERT into list_default VALUES ('d', 10);
 
 -- fail
-update list_default set a = 'a' where a = 'd';
+UPDATE list_default set a = 'a' WHERE a = 'd';
 -- ok
-update list_default set a = 'x' where a = 'd';
+UPDATE list_default set a = 'x' WHERE a = 'd';
+
+DROP TABLE list_parted;
+
+--------------
+-- Some more update-partition-key test scenarios below. This time use list
+-- partitions.
+--------------
+
+-- Setup for list partitions
+CREATE TABLE list_parted (a numeric, b int, c int8) PARTITION BY list (a);
+CREATE TABLE sub_parted PARTITION OF list_parted for VALUES in (1) PARTITION BY list (b);
+
+CREATE TABLE sub_part1(b int, c int8, a numeric);
+ALTER TABLE sub_parted ATTACH PARTITION sub_part1 for VALUES in (1);
+CREATE TABLE sub_part2(b int, c int8, a numeric);
+ALTER TABLE sub_parted ATTACH PARTITION sub_part2 for VALUES in (2);
+
+CREATE TABLE list_part1(a numeric, b int, c int8);
+ALTER TABLE list_parted ATTACH PARTITION list_part1 for VALUES in (2,3);
+
+INSERT into list_parted VALUES (2,5,50);
+INSERT into list_parted VALUES (3,6,60);
+INSERT into sub_parted VALUES (1,1,60);
+INSERT into sub_parted VALUES (1,2,10);
+
+-- Test partition constraint violation when intermediate ancestor is used and
+-- constraint is inherited from upper root.
+UPDATE sub_parted set a = 2 WHERE c = 10;
+
+-- Test update-partition-key, where the unpruned partitions do not have their
+-- partition keys updated.
+SELECT tableoid::regclass::text, * FROM list_parted WHERE a = 2 ORDER BY 1;
+UPDATE list_parted set b = c + a WHERE a = 2;
+SELECT tableoid::regclass::text, * FROM list_parted WHERE a = 2 ORDER BY 1;
+
+
+-- Test the case where BR UPDATE triggers change the partition key.
+CREATE FUNCTION func_parted_mod_b() returns trigger as $$
+BEGIN
+   NEW.b = 2; -- This is changing partition key column.
+   return NEW;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER parted_mod_b before update on sub_part1
+   for each row execute procedure func_parted_mod_b();
+
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+
+-- This should do the tuple routing even though there is no explicit
+-- partition-key update, because there is a trigger on sub_part1.
+UPDATE list_parted set c = 70 WHERE b  = 1;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+
+DROP TRIGGER parted_mod_b ON sub_part1;
+
+-- If BR DELETE trigger prevented DELETE from happening, we should also skip
+-- the INSERT if that delete is part of UPDATE=>DELETE+INSERT.
+CREATE OR REPLACE FUNCTION func_parted_mod_b() returns trigger as $$
+BEGIN
+   raise notice 'Trigger: Got OLD row %, but returning NULL', OLD;
+   return NULL;
+END $$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_skip_delete before delete on sub_part2
+   for each row execute procedure func_parted_mod_b();
+UPDATE list_parted set b = 1 WHERE c = 70;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+-- Drop the trigger. Now the row should be moved.
+DROP TRIGGER trig_skip_delete ON sub_part2;
+UPDATE list_parted set b = 1 WHERE c = 70;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+DROP FUNCTION func_parted_mod_b();
+
+-- UPDATE partition-key with FROM clause. If join produces multiple output
+-- rows for the same row to be modified, we should tuple-route the row only
+-- once. There should not be any rows inserted.
+CREATE TABLE non_parted (id int);
+INSERT into non_parted VALUES (1), (1), (1), (2), (2), (2), (3), (3), (3);
+UPDATE list_parted t1 set a = 2 FROM non_parted t2 WHERE t1.a = t2.id and a = 1;
+SELECT tableoid::regclass::text, * FROM list_parted ORDER BY 1, 2, 3, 4;
+DROP TABLE non_parted;
+
+-- Cleanup: list_parted no longer needed.
+DROP TABLE list_parted;
 
 -- create custom operator class and hash function, for the same reason
 -- explained in alter_table.sql
 
 -- fail
 update hpart1 set a = 3, b=4 where a = 1;
+-- ok, row movement
 update hash_parted set b = b - 1 where b = 1;
 -- ok
 update hash_parted set b = b + 8 where b = 1;
 
 -- cleanup
-drop table range_parted;
-drop table list_parted;
 drop table hash_parted;
 drop operator class custom_opclass using hash;
 drop function dummy_hashint4(a int4, seed int8);
 
 PartitionRangeDatumKind
 PartitionScheme
 PartitionSpec
+PartitionTupleRouting
 PartitionedChildRelInfo
 PasswordType
 Path