Add bdr_unsubscribe()
authorPetr Jelinek <[email protected]>
Wed, 3 Jun 2015 18:51:07 +0000 (20:51 +0200)
committerPetr Jelinek <[email protected]>
Thu, 11 Jun 2015 14:17:12 +0000 (16:17 +0200)
Makefile.in
bdr.control
bdr_perdb.c
bdr_remotecalls.c
expected/part_bdr.out
expected/part_udr.out
expected/upgrade.out
extsql/bdr--0.10.0.3--0.10.0.4.sql [new file with mode: 0644]
sql/part_bdr.sql
sql/part_udr.sql
sql/upgrade.sql

index 1266ebd48308ae115ef1e9228f13d542c7657f98..1c9b6304e87a3bf4991518dcbcb1018a9809ce81 100644 (file)
@@ -32,7 +32,8 @@ DATA = \
    extsql/bdr--0.9.1.0--0.10.0.0.sql \
    extsql/bdr--0.10.0.0--0.10.0.1.sql \
    extsql/bdr--0.10.0.1--0.10.0.2.sql \
-   extsql/bdr--0.10.0.2--0.10.0.3.sql
+   extsql/bdr--0.10.0.2--0.10.0.3.sql  \
+   extsql/bdr--0.10.0.3--0.10.0.4.sql
 
 DATA_built = \
    extsql/bdr--0.8.0.1.sql \
@@ -52,7 +53,8 @@ DATA_built = \
    extsql/bdr--0.10.0.0.sql \
    extsql/bdr--0.10.0.1.sql \
    extsql/bdr--0.10.0.2.sql \
-   extsql/bdr--0.10.0.3.sql
+   extsql/bdr--0.10.0.3.sql \
+   extsql/bdr--0.10.0.4.sql
 
 DOCS = bdr.conf.sample README.bdr
 SCRIPTS = scripts/bdr_initial_load bdr_init_copy bdr_resetxlog bdr_dump
@@ -204,6 +206,10 @@ extsql/bdr--0.9.1.0.sql: extsql/bdr--0.9.0.5.sql extsql/bdr--0.9.0.5--0.9.1.0.sq
    mkdir -p extsql
    cat $^ > $@
 
+extsql/bdr--0.10.0.4.sql: extsql/bdr--0.10.0.3.sql extsql/bdr--0.10.0.3--0.10.0.4.sql
+   mkdir -p extsql
+   cat $^ > $@
+
 bdr_resetxlog: pg_resetxlog.o
    $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(libpq_pgport) $(LIBS) -o $@$(X)
 
index c5b4a184d340b993cab925f20af1866305d0724b..f1a54470b9b4937d0ad7f83a3a98039a4a290c79 100644 (file)
@@ -1,6 +1,6 @@
 # bdr extension
 comment = 'Bi-directional replication for PostgreSQL'
-default_version = '0.10.0.3'
+default_version = '0.10.0.4'
 module_pathname = '$libdir/bdr'
 relocatable = false
 requires = btree_gist
index f21ce3bd69478377872a504644619d8581890c59..180e73e734b3ad1db062bfb3e195cff39a7a5d78 100644 (file)
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 
+PGDLLEXPORT Datum bdr_get_apply_pid(PG_FUNCTION_ARGS);
+
 PG_FUNCTION_INFO_V1(bdr_connections_changed);
+PG_FUNCTION_INFO_V1(bdr_get_apply_pid);
 
 /* In the commit hook, should we attempt to start a per-db worker? */
 static bool xacthook_registered = false;
@@ -315,7 +318,6 @@ bdr_maintain_db_workers(void)
        Assert(!isnull);
 
        LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
-
        for (slotoff = 0; slotoff < bdr_max_workers; slotoff++)
        {
            BdrWorker  *w = &BdrWorkerCtl->slots[slotoff];
@@ -376,6 +378,7 @@ bdr_maintain_db_workers(void)
                kill(w->worker_pid, SIGTERM);
            }
        }
+       LWLockRelease(BdrWorkerCtl->lock);
 
        if (found_alive)
        {
@@ -442,7 +445,6 @@ bdr_maintain_db_workers(void)
             */
        }
 
-       LWLockRelease(BdrWorkerCtl->lock);
        continue;
    }
 
@@ -850,3 +852,36 @@ bdr_perdb_worker_main(Datum main_arg)
    perdb->database_oid = InvalidOid;
    proc_exit(0);
 }
+
+
+/* Get pid of current apply backend for given node. */
+Datum
+bdr_get_apply_pid(PG_FUNCTION_ARGS)
+{
+   const char *remote_sysid_str = text_to_cstring(PG_GETARG_TEXT_P(0));
+   Oid         remote_tli = PG_GETARG_OID(1);
+   Oid         remote_dboid = PG_GETARG_OID(2);
+   uint64      remote_sysid;
+   BdrWorker  *worker = NULL;
+   pid_t       ret;
+
+   if (sscanf(remote_sysid_str, UINT64_FORMAT, &remote_sysid) != 1)
+       elog(ERROR, "Parsing of remote sysid as uint64 failed");
+
+   LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+
+   find_apply_worker_slot(remote_sysid, remote_tli, remote_dboid, &worker);
+
+   /* Worker slot not found or worker isn't running. */
+   if (worker == NULL || worker->worker_proc == NULL)
+   {
+       LWLockRelease(BdrWorkerCtl->lock);
+       PG_RETURN_NULL();
+   }
+
+   ret = worker->worker_pid;
+
+   LWLockRelease(BdrWorkerCtl->lock);
+
+   PG_RETURN_INT32(ret);
+}
index 0725dc6a3fa8a8bab56da04489dabd18d4d37962..c90c8e11efac8791163d0a275f730dd40edcfda9 100644 (file)
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "bdr.h"
+#include "bdr_internal.h"
 
 #include "fmgr.h"
 #include "funcapi.h"
@@ -48,11 +49,13 @@ PGDLLEXPORT Datum bdr_get_remote_nodeinfo(PG_FUNCTION_ARGS);
 PGDLLEXPORT Datum bdr_test_replication_connection(PG_FUNCTION_ARGS);
 PGDLLEXPORT Datum bdr_test_remote_connectback(PG_FUNCTION_ARGS);
 PGDLLEXPORT Datum bdr_copytable_test(PG_FUNCTION_ARGS);
+PGDLLEXPORT Datum bdr_drop_remote_slot(PG_FUNCTION_ARGS);
 
 PG_FUNCTION_INFO_V1(bdr_get_remote_nodeinfo);
 PG_FUNCTION_INFO_V1(bdr_test_replication_connection);
 PG_FUNCTION_INFO_V1(bdr_test_remote_connectback);
 PG_FUNCTION_INFO_V1(bdr_copytable_test);
+PG_FUNCTION_INFO_V1(bdr_drop_remote_slot);
 
 /*
  * Make standard postgres connection, ERROR on failure.
@@ -579,9 +582,97 @@ bdr_test_remote_connectback(PG_FUNCTION_ARGS)
        free_remote_node_info(&ri);
    }
    PG_END_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
-                           PointerGetDatum(&conn));
+                               PointerGetDatum(&conn));
 
    PQfinish(conn);
 
    PG_RETURN_DATUM(HeapTupleGetDatum(returnTuple));
 }
+
+
+/*
+ * Drops replication slot on remote node that has been used by the local node.
+ */
+Datum
+bdr_drop_remote_slot(PG_FUNCTION_ARGS)
+{
+   const char *remote_sysid_str = text_to_cstring(PG_GETARG_TEXT_P(0));
+   Oid         remote_tli = PG_GETARG_OID(1);
+   Oid         remote_dboid = PG_GETARG_OID(2);
+   uint64      remote_sysid;
+   PGconn     *conn;
+   PGresult   *res;
+   NameData    slotname;
+   BdrConnectionConfig *cfg;
+
+   if (sscanf(remote_sysid_str, UINT64_FORMAT, &remote_sysid) != 1)
+       elog(ERROR, "Parsing of remote sysid as uint64 failed");
+
+   cfg = bdr_get_connection_config(remote_sysid, remote_tli, remote_dboid, false);
+   conn = bdr_connect_nonrepl(cfg->dsn, "bdr_drop_replication_slot");
+   bdr_free_connection_config(cfg);
+
+   PG_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+                           PointerGetDatum(&conn));
+   {
+       struct remote_node_info ri;
+       const char *    values[1];
+       Oid             types[1] = { TEXTOID };
+
+       /* Try connecting and build slot name from retrieved info */
+       bdr_get_remote_nodeinfo_internal(conn, &ri);
+       bdr_slot_name(&slotname, GetSystemIdentifier(), ThisTimeLineID,
+                     MyDatabaseId, remote_dboid);
+       free_remote_node_info(&ri);
+
+       values[0] = NameStr(slotname);
+
+       /* Check if the slot exists */
+       res = PQexecParams(conn,
+                          "SELECT plugin "
+                          "FROM pg_catalog.pg_replication_slots "
+                          "WHERE slot_name = $1",
+                          1, types, values, NULL, NULL, 0);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           ereport(ERROR,
+                   (errmsg("getting remote slot info failed"),
+                   errdetail("SELECT FROM pg_catalog.pg_replication_slots failed with: %s",
+                       PQerrorMessage(conn))));
+       }
+
+       /* Slot not found return false */
+       if (PQntuples(res) == 0)
+       {
+           PQfinish(conn);
+           PG_RETURN_BOOL(false);
+       }
+
+       /* Slot found, validate that it's BDR slot */
+       if (PQgetisnull(res, 0, 0))
+           elog(ERROR, "Unexpectedly null field %s", PQfname(res, 0));
+
+       if (strcmp("bdr", PQgetvalue(res, 0, 0)) != 0)
+           ereport(ERROR,
+                   (errmsg("slot %s is not BDR slot", NameStr(slotname))));
+
+       res = PQexecParams(conn, "SELECT pg_drop_replication_slot($1)",
+                          1, types, values, NULL, NULL, 0);
+
+       /* And finally, drop the slot. */
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           ereport(ERROR,
+                   (errmsg("remote slot drop failed"),
+                   errdetail("SELECT pg_drop_replication_slot() failed with: %s",
+                       PQerrorMessage(conn))));
+       }
+   }
+   PG_END_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+                               PointerGetDatum(&conn));
+
+   PQfinish(conn);
+
+   PG_RETURN_BOOL(true);
+}
index 18b484095b64027a725f2f7262abb0eda3dd3800..8faa6001e5836611249d8fb89a84be9a80e11e03 100644 (file)
@@ -1,4 +1,6 @@
 \c regression
+SELECT bdr.bdr_unsubscribe('node-pg');
+ERROR:  Unidirectional connection to node node-pg not found
 SELECT bdr.bdr_part_by_node_names(ARRAY['node-pg']);
  bdr_part_by_node_names 
 ------------------------
index c4ccf12364fd46c3786a9eef0c46e758ad07dc96..36d5a1db49207086c0ae50c414666eba293cb886 100644 (file)
@@ -1 +1,41 @@
--- Not implemented
+\c postgres
+SELECT bdr.bdr_unsubscribe('node-pg');
+ bdr_unsubscribe 
+-----------------
+(1 row)
+
+-- wait till all slots are killed, we need a better way for that.
+SELECT pg_sleep(1);
+ pg_sleep 
+----------
+(1 row)
+
+-- the node state for the removed node should show 'k'
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+     node_name      | node_status 
+--------------------+-------------
+ node-pg            | k
+ node-pg-subscriber | k
+(2 rows)
+
+\c regression
+-- There should now be zero slots
+SELECT * FROM pg_replication_slots;
+ slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn 
+-----------+--------+-----------+--------+----------+--------+------+--------------+-------------
+(0 rows)
+
+-- Zero active connections
+SELECT count(*) FROM pg_stat_replication;
+ count 
+-------
+     0
+(1 row)
+
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+ node_name | node_status 
+-----------+-------------
+(0 rows)
+
index bec017d2686c6a8fdd08d5d43bddaaa874132bda..4b3bdd2252f4f870cfd7cdc0ce6ba457aaf802a2 100644 (file)
@@ -45,6 +45,8 @@ CREATE EXTENSION bdr VERSION '0.10.0.2';
 DROP EXTENSION bdr;
 CREATE EXTENSION bdr VERSION '0.10.0.3';
 DROP EXTENSION bdr;
+CREATE EXTENSION bdr VERSION '0.10.0.4';
+DROP EXTENSION bdr;
 -- evolve version one by one from the oldest to the newest one
 CREATE EXTENSION bdr VERSION '0.8.0';
 ALTER EXTENSION bdr UPDATE TO '0.8.0.1';
@@ -65,14 +67,15 @@ ALTER EXTENSION bdr UPDATE TO '0.10.0.0';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.1';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.2';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.3';
+ALTER EXTENSION bdr UPDATE TO '0.10.0.4';
 -- Should never have to do anything: You missed adding the new version above.
 ALTER EXTENSION bdr UPDATE;
-NOTICE:  version "0.10.0.3" of extension "bdr" is already installed
+NOTICE:  version "0.10.0.4" of extension "bdr" is already installed
 \dx bdr
                        List of installed extensions
  Name | Version  |   Schema   |                Description                
 ------+----------+------------+-------------------------------------------
- bdr  | 0.10.0.3 | pg_catalog | Bi-directional replication for PostgreSQL
+ bdr  | 0.10.0.4 | pg_catalog | Bi-directional replication for PostgreSQL
 (1 row)
 
 \c postgres
diff --git a/extsql/bdr--0.10.0.3--0.10.0.4.sql b/extsql/bdr--0.10.0.3--0.10.0.4.sql
new file mode 100644 (file)
index 0000000..bc5ee21
--- /dev/null
@@ -0,0 +1,118 @@
+SET LOCAL search_path = bdr;
+SET bdr.permit_unsafe_ddl_commands = true;
+SET bdr.skip_ddl_replication = true;
+
+CREATE OR REPLACE FUNCTION bdr.bdr_drop_remote_slot(sysid text, timeline oid, dboid oid)
+RETURNS boolean LANGUAGE C VOLATILE STRICT
+AS 'MODULE_PATHNAME';
+
+CREATE OR REPLACE FUNCTION bdr.bdr_get_apply_pid(sysid text, timeline oid, dboid oid)
+RETURNS integer LANGUAGE C VOLATILE STRICT
+AS 'MODULE_PATHNAME';
+
+CREATE OR REPLACE FUNCTION bdr.bdr_unsubscribe(node_name text, drop_slot boolean DEFAULT true)
+RETURNS void LANGUAGE plpgsql VOLATILE
+SET search_path = bdr, pg_catalog
+SET bdr.permit_unsafe_ddl_commands = on
+SET bdr.skip_ddl_replication = on
+SET bdr.skip_ddl_locking = on
+AS $body$
+DECLARE
+    localid record;
+    remoteid record;
+    v_node_name alias for node_name;
+   v_pid integer;
+BEGIN
+    -- Concurrency
+    LOCK TABLE bdr.bdr_connections IN EXCLUSIVE MODE;
+    LOCK TABLE bdr.bdr_nodes IN EXCLUSIVE MODE;
+    LOCK TABLE pg_catalog.pg_shseclabel IN EXCLUSIVE MODE;
+
+    -- Check the node exists
+    SELECT node_sysid AS sysid, node_timeline AS timeline,
+           node_dboid AS dboid INTO remoteid
+    FROM bdr.bdr_nodes
+    WHERE bdr_nodes.node_name = v_node_name;
+
+    IF NOT FOUND THEN
+        RAISE NOTICE 'Node % not found, nothing done', v_node_name;
+        RETURN;
+    END IF;
+
+    -- Check the connection is unidirectional
+    SELECT sysid, timeline, dboid INTO localid
+    FROM bdr.bdr_get_local_nodeid();
+
+    PERFORM 1 FROM bdr_connections
+    WHERE conn_sysid = remoteid.sysid
+      AND conn_timeline = remoteid.timeline
+      AND conn_dboid = remoteid.dboid
+      AND conn_origin_sysid = localid.sysid
+      AND conn_origin_timeline = localid.timeline
+      AND conn_origin_dboid = localid.dboid
+      AND conn_is_unidirectional = 't';
+
+    IF NOT FOUND THEN
+        RAISE EXCEPTION 'Unidirectional connection to node % not found',
+            v_node_name;
+        RETURN;
+    END IF;
+
+    -- Mark the provider node as killed
+    UPDATE bdr.bdr_nodes
+    SET node_status = 'k'
+    WHERE bdr_nodes.node_name = v_node_name;
+
+    --
+    -- Check what to do with local node based on active connections
+    --
+    -- Note the logic here is:
+    --  - if this is UDR node and there are still live provider nodes, we keep local node active
+    --  - if this is UDR node and all provider nodes were killed, local node will be killed as well
+    --  - if this is UDR + BDR node there will be connection pointing towards local node so we keep it active
+    --
+    PERFORM 1
+    FROM bdr.bdr_connections c
+        JOIN bdr.bdr_nodes n ON (
+            conn_sysid = node_sysid AND
+            conn_timeline = node_timeline AND
+            conn_dboid = node_dboid
+        )
+    WHERE node_status <> 'k';
+
+    IF NOT FOUND THEN
+        UPDATE bdr.bdr_nodes
+        SET node_status = 'k'
+        FROM bdr.bdr_get_local_nodeid()
+        WHERE node_sysid = sysid
+            AND node_timeline = timeline
+            AND node_dboid = dboid;
+    END IF;
+
+   IF drop_slot THEN
+       -- Stop the local apply so that slot on remote site can be dropped
+       -- The apply won't be able to restart because we have bdr_connections
+       -- and bdr_nodes locked exclusively.
+       LOOP
+           v_pid := bdr.bdr_get_apply_pid(remoteid.sysid, remoteid.timeline, remoteid.dboid);
+           IF v_pid IS NULL THEN
+               EXIT;
+           END IF;
+
+           PERFORM pg_terminate_backend(v_pid);
+
+           PERFORM pg_sleep(0.5);
+       END LOOP;
+
+       -- Drop the remote slot
+       PERFORM bdr.bdr_drop_remote_slot(remoteid.sysid, remoteid.timeline, remoteid.dboid);
+   END IF;
+
+    -- Notify local perdb worker to kill nodes.
+    PERFORM bdr.bdr_connections_changed();
+END;
+$body$;
+
+RESET bdr.permit_unsafe_ddl_commands;
+RESET bdr.skip_ddl_replication;
+RESET search_path;
index 234202924bd0dfeb077edc3e030b5656e8b3ece0..a21a93e8d85fe8bb1afaeb773656c6479122113d 100644 (file)
@@ -1,4 +1,5 @@
 \c regression
+SELECT bdr.bdr_unsubscribe('node-pg');
 SELECT bdr.bdr_part_by_node_names(ARRAY['node-pg']);
 
 -- wait till all slots are killed, we need a better way for that.
index c4ccf12364fd46c3786a9eef0c46e758ad07dc96..9f55d5f9a99f96fc3c7a38e16b184cb615564b94 100644 (file)
@@ -1 +1,17 @@
--- Not implemented
+\c postgres
+SELECT bdr.bdr_unsubscribe('node-pg');
+
+-- wait till all slots are killed, we need a better way for that.
+SELECT pg_sleep(1);
+
+-- the node state for the removed node should show 'k'
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+
+\c regression
+
+-- There should now be zero slots
+SELECT * FROM pg_replication_slots;
+-- Zero active connections
+SELECT count(*) FROM pg_stat_replication;
+
+SELECT node_name, node_status FROM bdr.bdr_nodes;
index 72e34f5d07872cd077c58da5130a0e9bbddf1ffe..0d675237564b57ab9ed4156cab4d4b3364590379 100644 (file)
@@ -66,6 +66,9 @@ DROP EXTENSION bdr;
 CREATE EXTENSION bdr VERSION '0.10.0.3';
 DROP EXTENSION bdr;
 
+CREATE EXTENSION bdr VERSION '0.10.0.4';
+DROP EXTENSION bdr;
+
 -- evolve version one by one from the oldest to the newest one
 CREATE EXTENSION bdr VERSION '0.8.0';
 ALTER EXTENSION bdr UPDATE TO '0.8.0.1';
@@ -86,6 +89,7 @@ ALTER EXTENSION bdr UPDATE TO '0.10.0.0';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.1';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.2';
 ALTER EXTENSION bdr UPDATE TO '0.10.0.3';
+ALTER EXTENSION bdr UPDATE TO '0.10.0.4';
 
 -- Should never have to do anything: You missed adding the new version above.
 ALTER EXTENSION bdr UPDATE;