pg_upgrade: Parallelize retrieving loadable libraries.
authorNathan Bossart <[email protected]>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
committerNathan Bossart <[email protected]>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
This commit makes use of the new task framework in pg_upgrade to
parallelize retrieving the names of all libraries referenced by
non-built-in C functions.  This step will now process multiple
databases concurrently when pg_upgrade's --jobs option is provided
a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13

src/bin/pg_upgrade/function.c

index 7e3abed0985d5b11950de9a154c68c4f085a42eb..bd2b8b6bd6698b58ae048f064cf12bad8ceaffdf 100644 (file)
@@ -42,6 +42,30 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+/*
+ * Private state for get_loadable_libraries()'s UpgradeTask.
+ */
+struct loadable_libraries_state
+{
+       PGresult  **ress;                       /* results for each database */
+       int                     totaltups;              /* number of tuples in all results */
+};
+
+/*
+ * Callback function for processing results of query for
+ * get_loadable_libraries()'s UpgradeTask.  This function stores the results
+ * for later use within get_loadable_libraries().
+ */
+static void
+process_loadable_libraries(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg;
+
+       AssertVariableIsOfType(&process_loadable_libraries, UpgradeTaskProcessCB);
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +78,41 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       UpgradeTask *task = upgrade_task_create();
+       struct loadable_libraries_state state;
+       char       *query;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
-       totaltups = 0;
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
+       state.totaltups = 0;
 
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, active_db->db_name);
+       query = psprintf("SELECT DISTINCT probin "
+                                        "FROM pg_catalog.pg_proc "
+                                        "WHERE prolang = %u AND "
+                                        "probin IS NOT NULL AND "
+                                        "oid >= %u",
+                                        ClanguageId,
+                                        FirstNormalObjectId);
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               "SELECT DISTINCT probin "
-                                                                               "FROM pg_catalog.pg_proc "
-                                                                               "WHERE prolang = %u AND "
-                                                                               "probin IS NOT NULL AND "
-                                                                               "oid >= %u;",
-                                                                               ClanguageId,
-                                                                               FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
-
-               PQfinish(conn);
-       }
+       upgrade_task_add_step(task, query, process_loadable_libraries,
+                                                 false, &state);
+
+       upgrade_task_run(task, &old_cluster);
+       upgrade_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +147,8 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
+       pg_free(query);
 
        os_info.num_libraries = totaltups;
 }