Handle arrays and ranges in pg_upgrade's test for non-upgradable types.
authorTom Lane <[email protected]>
Wed, 13 Nov 2019 16:35:37 +0000 (11:35 -0500)
committerTom Lane <[email protected]>
Wed, 13 Nov 2019 16:35:37 +0000 (11:35 -0500)
pg_upgrade needs to check whether certain non-upgradable data types
appear anywhere on-disk in the source cluster.  It knew that it has
to check for these types being contained inside domains and composite
types; but it somehow overlooked that they could be contained in
arrays and ranges, too.  Extend the existing recursive-containment
query to handle those cases.

We probably should have noticed this oversight while working on
commit 0ccfc2822 and follow-ups, but we failed to :-(.  The whole
thing's possibly a bit overdesigned, since we don't really expect
that any of these types will appear on disk; but if we're going to
the effort of doing a recursive search then it's silly not to cover
all the possibilities.

While at it, refactor so that we have only one copy of the search
logic, not three-and-counting.  Also, to keep the branches looking
more alike, back-patch the output wording change of commit 1634d3615.

Back-patch to all supported branches.

Discussion: https://postgr.es/m/31473.1573412838@sss.pgh.pa.us

src/bin/pg_upgrade/version.c

index 9deb53c8b74c82142da290758e20c9545ce293fc..37cb8533df549a4f28b89b59cf3861967214ca06 100644 (file)
@@ -97,27 +97,27 @@ new_9_0_populate_pg_largeobject_metadata(ClusterInfo *cluster, bool check_mode)
 
 
 /*
- * old_9_3_check_for_line_data_type_usage()
- *     9.3 -> 9.4
- *     Fully implement the 'line' data type in 9.4, which previously returned
- *     "not enabled" by default and was only functionally enabled with a
- *     compile-time switch;  9.4 "line" has different binary and text
- *     representation formats;  checks tables and indexes.
+ * check_for_data_type_usage
+ *     Detect whether there are any stored columns depending on the given type
+ *
+ * If so, write a report to the given file name, and return true.
+ *
+ * We check for the type in tables, matviews, and indexes, but not views;
+ * there's no storage involved in a view.
  */
-void
-old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
+static bool
+check_for_data_type_usage(ClusterInfo *cluster, const char *typename,
+                                                 char *output_path)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
        bool            found = false;
-       char            output_path[MAXPGPATH];
-
-       prep_status("Checking for incompatible \"line\" data type");
-
-       snprintf(output_path, sizeof(output_path), "tables_using_line.txt");
+       FILE       *script = NULL;
+       int                     dbnum;
 
        for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
        {
+               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
+               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+               PQExpBufferData querybuf;
                PGresult   *res;
                bool            db_used = false;
                int                     ntups;
@@ -125,50 +125,68 @@ old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
                int                     i_nspname,
                                        i_relname,
                                        i_attname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
 
                /*
-                * The pg_catalog.line type may be wrapped in a domain or composite
-                * type, or both (9.3 did not allow domains on composite types, but
-                * there may be multi-level composite type). To detect these cases
-                * we need a recursive CTE.
+                * The type of interest might be wrapped in a domain, array,
+                * composite, or range, and these container types can be nested (to
+                * varying extents depending on server version, but that's not of
+                * concern here).  To handle all these cases we need a recursive CTE.
                 */
-               res = executeQueryOrDie(conn,
-                                                               "WITH RECURSIVE oids AS ( "
-               /* the pg_catalog.line type itself */
-                                                               "       SELECT 'pg_catalog.line'::pg_catalog.regtype AS oid "
-                                                               "       UNION ALL "
-                                                               "       SELECT * FROM ( "
-               /* domains on the type */
-                                                               "               WITH x AS (SELECT oid FROM oids) "
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
-                                                               "                       UNION "
-               /* composite types containing the type */
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
-                                                               "                       WHERE t.typtype = 'c' AND "
-                                                               "                                 t.oid = c.reltype AND "
-                                                               "                                 c.oid = a.attrelid AND "
-                                                               "                                 NOT a.attisdropped AND "
-                                                               "                                 a.atttypid = x.oid "
-                                                               "       ) foo "
-                                                               ") "
-                                                               "SELECT n.nspname, c.relname, a.attname "
-                                                               "FROM   pg_catalog.pg_class c, "
-                                                               "               pg_catalog.pg_namespace n, "
-                                                               "               pg_catalog.pg_attribute a "
-                                                               "WHERE  c.oid = a.attrelid AND "
-                                                               "               NOT a.attisdropped AND "
-                                                               "               a.atttypid IN (SELECT oid FROM oids) AND "
-                                                               "               c.relkind IN ("
-                                                               CppAsString2(RELKIND_RELATION) ", "
-                                                               CppAsString2(RELKIND_MATVIEW) ", "
-                                                               CppAsString2(RELKIND_INDEX) ") AND "
-                                                               "               c.relnamespace = n.oid AND "
+               initPQExpBuffer(&querybuf);
+               appendPQExpBuffer(&querybuf,
+                                                 "WITH RECURSIVE oids AS ( "
+               /* the target type itself */
+                                                 "     SELECT '%s'::pg_catalog.regtype AS oid "
+                                                 "     UNION ALL "
+                                                 "     SELECT * FROM ( "
+               /* inner WITH because we can only reference the CTE once */
+                                                 "             WITH x AS (SELECT oid FROM oids) "
+               /* domains on any type selected so far */
+                                                 "                     SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                                 "                     UNION ALL "
+               /* arrays over any type selected so far */
+                                                 "                     SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+                                                 "                     UNION ALL "
+               /* composite types containing any type selected so far */
+                                                 "                     SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                                 "                     WHERE t.typtype = 'c' AND "
+                                                 "                               t.oid = c.reltype AND "
+                                                 "                               c.oid = a.attrelid AND "
+                                                 "                               NOT a.attisdropped AND "
+                                                 "                               a.atttypid = x.oid ",
+                                                 typename);
+
+               /* Ranges came in in 9.2 */
+               if (GET_MAJOR_VERSION(cluster->major_version) >= 902)
+                       appendPQExpBuffer(&querybuf,
+                                                         "                     UNION ALL "
+                       /* ranges containing any type selected so far */
+                                                         "                     SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+                                                         "                     WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid");
+
+               appendPQExpBuffer(&querybuf,
+                                                 "     ) foo "
+                                                 ") "
+               /* now look for stored columns of any such type */
+                                                 "SELECT n.nspname, c.relname, a.attname "
+                                                 "FROM pg_catalog.pg_class c, "
+                                                 "             pg_catalog.pg_namespace n, "
+                                                 "             pg_catalog.pg_attribute a "
+                                                 "WHERE        c.oid = a.attrelid AND "
+                                                 "             NOT a.attisdropped AND "
+                                                 "             a.atttypid IN (SELECT oid FROM oids) AND "
+                                                 "             c.relkind IN ("
+                                                 CppAsString2(RELKIND_RELATION) ", "
+                                                 CppAsString2(RELKIND_MATVIEW) ", "
+                                                 CppAsString2(RELKIND_INDEX) ") AND "
+                                                 "             c.relnamespace = n.oid AND "
                /* exclude possible orphaned temp tables */
-                                                               "               n.nspname !~ '^pg_temp_' AND "
-                                                               "               n.nspname !~ '^pg_toast_temp_' AND "
-                                                               "               n.nspname NOT IN ('pg_catalog', 'information_schema')");
+                                                 "             n.nspname !~ '^pg_temp_' AND "
+                                                 "             n.nspname !~ '^pg_toast_temp_' AND "
+               /* exclude system catalogs, too */
+                                                 "             n.nspname NOT IN ('pg_catalog', 'information_schema')");
+
+               res = executeQueryOrDie(conn, "%s", querybuf.data);
 
                ntups = PQntuples(res);
                i_nspname = PQfnumber(res, "nspname");
@@ -193,13 +211,36 @@ old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
 
                PQclear(res);
 
+               termPQExpBuffer(&querybuf);
+
                PQfinish(conn);
        }
 
        if (script)
                fclose(script);
 
-       if (found)
+       return found;
+}
+
+
+/*
+ * old_9_3_check_for_line_data_type_usage()
+ *     9.3 -> 9.4
+ *     Fully implement the 'line' data type in 9.4, which previously returned
+ *     "not enabled" by default and was only functionally enabled with a
+ *     compile-time switch; as of 9.4 "line" has a different on-disk
+ *     representation format.
+ */
+void
+old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
+{
+       char            output_path[MAXPGPATH];
+
+       prep_status("Checking for incompatible \"line\" data type");
+
+       snprintf(output_path, sizeof(output_path), "tables_using_line.txt");
+
+       if (check_for_data_type_usage(cluster, "pg_catalog.line", output_path))
        {
                pg_log(PG_REPORT, "fatal\n");
                pg_fatal("Your installation contains the \"line\" data type in user tables.  This\n"
@@ -226,105 +267,17 @@ old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
  *     mid-upgrade.  Worse, if there's a matview with such a column, the
  *     DDL reload will silently change it to "text" which won't match the
  *     on-disk storage (which is like "cstring").  So we *must* reject that.
- *     Also check composite types and domains on the "unknwown" type (even
- *     combinations of both), in case they are used for table columns.
- *     We needn't check indexes, because "unknown" has no opclasses.
  */
 void
 old_9_6_check_for_unknown_data_type_usage(ClusterInfo *cluster)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       bool            found = false;
        char            output_path[MAXPGPATH];
 
        prep_status("Checking for invalid \"unknown\" user columns");
 
        snprintf(output_path, sizeof(output_path), "tables_using_unknown.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname,
-                                       i_attname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The pg_catalog.unknown type may be wrapped in a domain or composite
-                * type, or both (9.3 did not allow domains on composite types, but
-                * there may be multi-level composite type). To detect these cases
-                * we need a recursive CTE.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "WITH RECURSIVE oids AS ( "
-               /* the pg_catalog.unknown type itself */
-                                                               "       SELECT 'pg_catalog.unknown'::pg_catalog.regtype AS oid "
-                                                               "       UNION ALL "
-                                                               "       SELECT * FROM ( "
-               /* domains on the type */
-                                                               "               WITH x AS (SELECT oid FROM oids) "
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
-                                                               "                       UNION "
-               /* composite types containing the type */
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
-                                                               "                       WHERE t.typtype = 'c' AND "
-                                                               "                                 t.oid = c.reltype AND "
-                                                               "                                 c.oid = a.attrelid AND "
-                                                               "                                 NOT a.attisdropped AND "
-                                                               "                                 a.atttypid = x.oid "
-                                                               "       ) foo "
-                                                               ") "
-                                                               "SELECT n.nspname, c.relname, a.attname "
-                                                               "FROM   pg_catalog.pg_class c, "
-                                                               "               pg_catalog.pg_namespace n, "
-                                                               "               pg_catalog.pg_attribute a "
-                                                               "WHERE  c.oid = a.attrelid AND "
-                                                               "               NOT a.attisdropped AND "
-                                                               "               a.atttypid IN (SELECT oid FROM oids) AND "
-                                                               "               c.relkind IN ("
-                                                               CppAsString2(RELKIND_RELATION) ", "
-                                                               CppAsString2(RELKIND_MATVIEW) ") AND "
-                                                               "               c.relnamespace = n.oid AND "
-               /* exclude possible orphaned temp tables */
-                                                               "               n.nspname !~ '^pg_temp_' AND "
-                                                               "               n.nspname !~ '^pg_toast_temp_' AND "
-                                                               "               n.nspname NOT IN ('pg_catalog', 'information_schema')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               i_attname = PQfnumber(res, "attname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       found = true;
-                       if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %s\n", output_path,
-                                                strerror(errno));
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname),
-                                       PQgetvalue(res, rowno, i_attname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
-
-       if (script)
-               fclose(script);
-
-       if (found)
+       if (check_for_data_type_usage(cluster, "pg_catalog.unknown", output_path))
        {
                pg_log(PG_REPORT, "fatal\n");
                pg_fatal("Your installation contains the \"unknown\" data type in user tables.  This\n"
@@ -456,105 +409,18 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
  *     which does affect the storage (name is by-ref, but not varlena). This
  *     means user tables using sql_identifier for columns are broken because
  *     the on-disk format is different.
- *
- *     We need to check all objects that might store sql_identifier on disk,
- *     i.e. tables, matviews and indexes. Also check composite types in case
- *     they are used in this context.
  */
 void
 old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       bool            found = false;
        char            output_path[MAXPGPATH];
 
        prep_status("Checking for invalid \"sql_identifier\" user columns");
 
        snprintf(output_path, sizeof(output_path), "tables_using_sql_identifier.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname,
-                                       i_attname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * We need the recursive CTE because the sql_identifier may be wrapped
-                * either in a domain or composite type, or both (in arbitrary order).
-                */
-               res = executeQueryOrDie(conn,
-                                                               "WITH RECURSIVE oids AS ( "
-               /* the sql_identifier type itself */
-                                                               "       SELECT 'information_schema.sql_identifier'::pg_catalog.regtype AS oid "
-                                                               "       UNION ALL "
-                                                               "       SELECT * FROM ( "
-               /* domains on the type */
-                                                               "               WITH x AS (SELECT oid FROM oids) "
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
-                                                               "                       UNION "
-               /* composite types containing the type */
-                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
-                                                               "                       WHERE t.typtype = 'c' AND "
-                                                               "                                 t.oid = c.reltype AND "
-                                                               "                                 c.oid = a.attrelid AND "
-                                                               "                                 NOT a.attisdropped AND "
-                                                               "                                 a.atttypid = x.oid "
-                                                               "       ) foo "
-                                                               ") "
-                                                               "SELECT n.nspname, c.relname, a.attname "
-                                                               "FROM   pg_catalog.pg_class c, "
-                                                               "               pg_catalog.pg_namespace n, "
-                                                               "               pg_catalog.pg_attribute a "
-                                                               "WHERE  c.oid = a.attrelid AND "
-                                                               "               NOT a.attisdropped AND "
-                                                               "               a.atttypid IN (SELECT oid FROM oids) AND "
-                                                               "               c.relkind IN ("
-                                                               CppAsString2(RELKIND_RELATION) ", "
-                                                               CppAsString2(RELKIND_MATVIEW) ", "
-                                                               CppAsString2(RELKIND_INDEX) ") AND "
-                                                               "               c.relnamespace = n.oid AND "
-               /* exclude possible orphaned temp tables */
-                                                               "               n.nspname !~ '^pg_temp_' AND "
-                                                               "               n.nspname !~ '^pg_toast_temp_' AND "
-                                                               "               n.nspname NOT IN ('pg_catalog', 'information_schema')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               i_attname = PQfnumber(res, "attname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       found = true;
-                       if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %s\n", output_path,
-                                                strerror(errno));
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname),
-                                       PQgetvalue(res, rowno, i_attname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
-
-       if (script)
-               fclose(script);
-
-       if (found)
+       if (check_for_data_type_usage(cluster, "information_schema.sql_identifier",
+                                                                 output_path))
        {
                pg_log(PG_REPORT, "fatal\n");
                pg_fatal("Your installation contains the \"sql_identifier\" data type in user tables\n"