#include "getopt_long.h"
 
 #define    DEFAULT_SUB_PORT    "50432"
+#define    OBJECTTYPE_PUBLICATIONS  0x0001
 
 /* Command-line options */
 struct CreateSubscriberOptions
    SimpleStringList sub_names; /* list of subscription names */
    SimpleStringList replslot_names;    /* list of replication slot names */
    int         recovery_timeout;   /* stop recovery after this time */
+   SimpleStringList objecttypes_to_remove; /* list of object types to remove */
 };
 
 /* per-database publication/subscription info */
 {
    struct LogicalRepInfo *dbinfo;
    bool        two_phase;      /* enable-two-phase option */
+   bits32      objecttypes_to_remove;  /* flags indicating which object types
+                                        * to remove on subscriber */
 };
 
 static void cleanup_objects_atexit(void);
 static void wait_for_end_recovery(const char *conninfo,
                                  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, const char *pubname,
+                            const char *dbname, bool *made_publication);
+static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
                                     const char *lsn);
            if (conn != NULL)
            {
                if (dbinfo->made_publication)
-                   drop_publication(conn, dbinfo);
+                   drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+                                    &dbinfo->made_publication);
                if (dbinfo->made_replslot)
                    drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
                disconnect_database(conn, false);
    printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
    printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
    printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
+   printf(_("  -R, --remove=OBJECTTYPE         remove all objects of the specified type from specified\n"
+            "                                  databases on the subscriber; accepts: publications\n"));
    printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
    printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
    printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
         */
        check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
 
-       /*
-        * Since the publication was created before the consistent LSN, it is
-        * available on the subscriber when the physical replica is promoted.
-        * Remove publications from the subscriber because it has no use.
-        */
-       drop_publication(conn, &dbinfo[i]);
+       /* Check and drop the required publications in the given database. */
+       check_and_drop_publications(conn, &dbinfo[i]);
 
        create_subscription(conn, &dbinfo[i]);
 
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Drop the specified publication in the given database.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication(PGconn *conn, const char *pubname, const char *dbname,
+                bool *made_publication)
 {
    PQExpBuffer str = createPQExpBuffer();
    PGresult   *res;
 
    Assert(conn != NULL);
 
-   pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+   pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
    pg_log_info("dropping publication \"%s\" in database \"%s\"",
-               dbinfo->pubname, dbinfo->dbname);
+               pubname, dbname);
 
    appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
            pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-                        dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-           dbinfo->made_publication = false;   /* don't try again. */
+                        pubname, dbname, PQresultErrorMessage(res));
+           *made_publication = false;  /* don't try again. */
 
            /*
             * Don't disconnect and exit here. This routine is used by primary
    destroyPQExpBuffer(str);
 }
 
+/*
+ * Retrieve and drop the publications.
+ *
+ * Since the publications were created before the consistent LSN, they
+ * remain on the subscriber even after the physical replica is
+ * promoted. Remove these publications from the subscriber because
+ * they have no use. Additionally, if requested, drop all pre-existing
+ * publications.
+ */
+static void
+check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+   PGresult   *res;
+   bool        drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
+
+   Assert(conn != NULL);
+
+   if (drop_all_pubs)
+   {
+       pg_log_info("dropping all existing publications in database \"%s\"",
+                   dbinfo->dbname);
+
+       /* Fetch all publication names */
+       res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           pg_log_error("could not obtain publication information: %s",
+                        PQresultErrorMessage(res));
+           PQclear(res);
+           disconnect_database(conn, true);
+       }
+
+       /* Drop each publication */
+       for (int i = 0; i < PQntuples(res); i++)
+           drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
+                            &dbinfo->made_publication);
+
+       PQclear(res);
+   }
+
+   /*
+    * In dry-run mode, we don't create publications, but we still try to drop
+    * those to provide necessary information to the user.
+    */
+   if (!drop_all_pubs || dry_run)
+       drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+                        &dbinfo->made_publication);
+}
+
 /*
  * Create a subscription with some predefined options.
  *
        {"dry-run", no_argument, NULL, 'n'},
        {"subscriber-port", required_argument, NULL, 'p'},
        {"publisher-server", required_argument, NULL, 'P'},
+       {"remove", required_argument, NULL, 'R'},
        {"socketdir", required_argument, NULL, 's'},
        {"recovery-timeout", required_argument, NULL, 't'},
        {"enable-two-phase", no_argument, NULL, 'T'},
 
    get_restricted_token();
 
-   while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+   while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
                            long_options, &option_index)) != -1)
    {
        switch (c)
            case 'P':
                opt.pub_conninfo_str = pg_strdup(optarg);
                break;
+           case 'R':
+               if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
+                   simple_string_list_append(&opt.objecttypes_to_remove, optarg);
+               else
+                   pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
+               break;
            case 's':
                opt.socket_dir = pg_strdup(optarg);
                canonicalize_path(opt.socket_dir);
        exit(1);
    }
 
+   /* Verify the object types specified for removal from the subscriber */
+   for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
+   {
+       if (pg_strcasecmp(cell->val, "publications") == 0)
+           dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
+       else
+       {
+           pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
+           pg_log_error_hint("The valid option is: \"publications\"");
+           exit(1);
+       }
+   }
+
    /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");