extsql/bdr--0.9.1.0--0.10.0.0.sql \
extsql/bdr--0.10.0.0--0.10.0.1.sql \
extsql/bdr--0.10.0.1--0.10.0.2.sql \
- extsql/bdr--0.10.0.2--0.10.0.3.sql
+ extsql/bdr--0.10.0.2--0.10.0.3.sql \
+ extsql/bdr--0.10.0.3--0.10.0.4.sql
DATA_built = \
extsql/bdr--0.8.0.1.sql \
extsql/bdr--0.10.0.0.sql \
extsql/bdr--0.10.0.1.sql \
extsql/bdr--0.10.0.2.sql \
- extsql/bdr--0.10.0.3.sql
+ extsql/bdr--0.10.0.3.sql \
+ extsql/bdr--0.10.0.4.sql
DOCS = bdr.conf.sample README.bdr
SCRIPTS = scripts/bdr_initial_load bdr_init_copy bdr_resetxlog bdr_dump
mkdir -p extsql
cat $^ > $@
+extsql/bdr--0.10.0.4.sql: extsql/bdr--0.10.0.3.sql extsql/bdr--0.10.0.3--0.10.0.4.sql
+ mkdir -p extsql
+ cat $^ > $@
+
bdr_resetxlog: pg_resetxlog.o
$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(libpq_pgport) $(LIBS) -o $@$(X)
# bdr extension
comment = 'Bi-directional replication for PostgreSQL'
-default_version = '0.10.0.3'
+default_version = '0.10.0.4'
module_pathname = '$libdir/bdr'
relocatable = false
requires = btree_gist
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+PGDLLEXPORT Datum bdr_get_apply_pid(PG_FUNCTION_ARGS);
+
PG_FUNCTION_INFO_V1(bdr_connections_changed);
+PG_FUNCTION_INFO_V1(bdr_get_apply_pid);
/* In the commit hook, should we attempt to start a per-db worker? */
static bool xacthook_registered = false;
Assert(!isnull);
LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
-
for (slotoff = 0; slotoff < bdr_max_workers; slotoff++)
{
BdrWorker *w = &BdrWorkerCtl->slots[slotoff];
kill(w->worker_pid, SIGTERM);
}
}
+ LWLockRelease(BdrWorkerCtl->lock);
if (found_alive)
{
*/
}
- LWLockRelease(BdrWorkerCtl->lock);
continue;
}
perdb->database_oid = InvalidOid;
proc_exit(0);
}
+
+
+/* Get pid of current apply backend for given node. */
+Datum
+bdr_get_apply_pid(PG_FUNCTION_ARGS)
+{
+ const char *remote_sysid_str = text_to_cstring(PG_GETARG_TEXT_P(0));
+ Oid remote_tli = PG_GETARG_OID(1);
+ Oid remote_dboid = PG_GETARG_OID(2);
+ uint64 remote_sysid;
+ BdrWorker *worker = NULL;
+ pid_t ret;
+
+ if (sscanf(remote_sysid_str, UINT64_FORMAT, &remote_sysid) != 1)
+ elog(ERROR, "Parsing of remote sysid as uint64 failed");
+
+ LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
+
+ find_apply_worker_slot(remote_sysid, remote_tli, remote_dboid, &worker);
+
+ /* Worker slot not found or worker isn't running. */
+ if (worker == NULL || worker->worker_proc == NULL)
+ {
+ LWLockRelease(BdrWorkerCtl->lock);
+ PG_RETURN_NULL();
+ }
+
+ ret = worker->worker_pid;
+
+ LWLockRelease(BdrWorkerCtl->lock);
+
+ PG_RETURN_INT32(ret);
+}
#include "postgres.h"
#include "bdr.h"
+#include "bdr_internal.h"
#include "fmgr.h"
#include "funcapi.h"
PGDLLEXPORT Datum bdr_test_replication_connection(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum bdr_test_remote_connectback(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum bdr_copytable_test(PG_FUNCTION_ARGS);
+PGDLLEXPORT Datum bdr_drop_remote_slot(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(bdr_get_remote_nodeinfo);
PG_FUNCTION_INFO_V1(bdr_test_replication_connection);
PG_FUNCTION_INFO_V1(bdr_test_remote_connectback);
PG_FUNCTION_INFO_V1(bdr_copytable_test);
+PG_FUNCTION_INFO_V1(bdr_drop_remote_slot);
/*
* Make standard postgres connection, ERROR on failure.
free_remote_node_info(&ri);
}
PG_END_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
- PointerGetDatum(&conn));
+ PointerGetDatum(&conn));
PQfinish(conn);
PG_RETURN_DATUM(HeapTupleGetDatum(returnTuple));
}
+
+
+/*
+ * Drops replication slot on remote node that has been used by the local node.
+ */
+Datum
+bdr_drop_remote_slot(PG_FUNCTION_ARGS)
+{
+ const char *remote_sysid_str = text_to_cstring(PG_GETARG_TEXT_P(0));
+ Oid remote_tli = PG_GETARG_OID(1);
+ Oid remote_dboid = PG_GETARG_OID(2);
+ uint64 remote_sysid;
+ PGconn *conn;
+ PGresult *res;
+ NameData slotname;
+ BdrConnectionConfig *cfg;
+
+ if (sscanf(remote_sysid_str, UINT64_FORMAT, &remote_sysid) != 1)
+ elog(ERROR, "Parsing of remote sysid as uint64 failed");
+
+ cfg = bdr_get_connection_config(remote_sysid, remote_tli, remote_dboid, false);
+ conn = bdr_connect_nonrepl(cfg->dsn, "bdr_drop_replication_slot");
+ bdr_free_connection_config(cfg);
+
+ PG_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+ PointerGetDatum(&conn));
+ {
+ struct remote_node_info ri;
+ const char * values[1];
+ Oid types[1] = { TEXTOID };
+
+ /* Try connecting and build slot name from retrieved info */
+ bdr_get_remote_nodeinfo_internal(conn, &ri);
+ bdr_slot_name(&slotname, GetSystemIdentifier(), ThisTimeLineID,
+ MyDatabaseId, remote_dboid);
+ free_remote_node_info(&ri);
+
+ values[0] = NameStr(slotname);
+
+ /* Check if the slot exists */
+ res = PQexecParams(conn,
+ "SELECT plugin "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_name = $1",
+ 1, types, values, NULL, NULL, 0);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ ereport(ERROR,
+ (errmsg("getting remote slot info failed"),
+ errdetail("SELECT FROM pg_catalog.pg_replication_slots failed with: %s",
+ PQerrorMessage(conn))));
+ }
+
+ /* Slot not found return false */
+ if (PQntuples(res) == 0)
+ {
+ PQfinish(conn);
+ PG_RETURN_BOOL(false);
+ }
+
+ /* Slot found, validate that it's BDR slot */
+ if (PQgetisnull(res, 0, 0))
+ elog(ERROR, "Unexpectedly null field %s", PQfname(res, 0));
+
+ if (strcmp("bdr", PQgetvalue(res, 0, 0)) != 0)
+ ereport(ERROR,
+ (errmsg("slot %s is not BDR slot", NameStr(slotname))));
+
+ res = PQexecParams(conn, "SELECT pg_drop_replication_slot($1)",
+ 1, types, values, NULL, NULL, 0);
+
+ /* And finally, drop the slot. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ ereport(ERROR,
+ (errmsg("remote slot drop failed"),
+ errdetail("SELECT pg_drop_replication_slot() failed with: %s",
+ PQerrorMessage(conn))));
+ }
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(bdr_cleanup_conn_close,
+ PointerGetDatum(&conn));
+
+ PQfinish(conn);
+
+ PG_RETURN_BOOL(true);
+}
\c regression
+SELECT bdr.bdr_unsubscribe('node-pg');
+ERROR: Unidirectional connection to node node-pg not found
SELECT bdr.bdr_part_by_node_names(ARRAY['node-pg']);
bdr_part_by_node_names
------------------------
--- Not implemented
+\c postgres
+SELECT bdr.bdr_unsubscribe('node-pg');
+ bdr_unsubscribe
+-----------------
+
+(1 row)
+
+-- wait till all slots are killed, we need a better way for that.
+SELECT pg_sleep(1);
+ pg_sleep
+----------
+
+(1 row)
+
+-- the node state for the removed node should show 'k'
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+ node_name | node_status
+--------------------+-------------
+ node-pg | k
+ node-pg-subscriber | k
+(2 rows)
+
+\c regression
+-- There should now be zero slots
+SELECT * FROM pg_replication_slots;
+ slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
+-----------+--------+-----------+--------+----------+--------+------+--------------+-------------
+(0 rows)
+
+-- Zero active connections
+SELECT count(*) FROM pg_stat_replication;
+ count
+-------
+ 0
+(1 row)
+
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+ node_name | node_status
+-----------+-------------
+(0 rows)
+
DROP EXTENSION bdr;
CREATE EXTENSION bdr VERSION '0.10.0.3';
DROP EXTENSION bdr;
+CREATE EXTENSION bdr VERSION '0.10.0.4';
+DROP EXTENSION bdr;
-- evolve version one by one from the oldest to the newest one
CREATE EXTENSION bdr VERSION '0.8.0';
ALTER EXTENSION bdr UPDATE TO '0.8.0.1';
ALTER EXTENSION bdr UPDATE TO '0.10.0.1';
ALTER EXTENSION bdr UPDATE TO '0.10.0.2';
ALTER EXTENSION bdr UPDATE TO '0.10.0.3';
+ALTER EXTENSION bdr UPDATE TO '0.10.0.4';
-- Should never have to do anything: You missed adding the new version above.
ALTER EXTENSION bdr UPDATE;
-NOTICE: version "0.10.0.3" of extension "bdr" is already installed
+NOTICE: version "0.10.0.4" of extension "bdr" is already installed
\dx bdr
List of installed extensions
Name | Version | Schema | Description
------+----------+------------+-------------------------------------------
- bdr | 0.10.0.3 | pg_catalog | Bi-directional replication for PostgreSQL
+ bdr | 0.10.0.4 | pg_catalog | Bi-directional replication for PostgreSQL
(1 row)
\c postgres
--- /dev/null
+SET LOCAL search_path = bdr;
+SET bdr.permit_unsafe_ddl_commands = true;
+SET bdr.skip_ddl_replication = true;
+
+CREATE OR REPLACE FUNCTION bdr.bdr_drop_remote_slot(sysid text, timeline oid, dboid oid)
+RETURNS boolean LANGUAGE C VOLATILE STRICT
+AS 'MODULE_PATHNAME';
+
+CREATE OR REPLACE FUNCTION bdr.bdr_get_apply_pid(sysid text, timeline oid, dboid oid)
+RETURNS integer LANGUAGE C VOLATILE STRICT
+AS 'MODULE_PATHNAME';
+
+CREATE OR REPLACE FUNCTION bdr.bdr_unsubscribe(node_name text, drop_slot boolean DEFAULT true)
+RETURNS void LANGUAGE plpgsql VOLATILE
+SET search_path = bdr, pg_catalog
+SET bdr.permit_unsafe_ddl_commands = on
+SET bdr.skip_ddl_replication = on
+SET bdr.skip_ddl_locking = on
+AS $body$
+DECLARE
+ localid record;
+ remoteid record;
+ v_node_name alias for node_name;
+ v_pid integer;
+BEGIN
+ -- Concurrency
+ LOCK TABLE bdr.bdr_connections IN EXCLUSIVE MODE;
+ LOCK TABLE bdr.bdr_nodes IN EXCLUSIVE MODE;
+ LOCK TABLE pg_catalog.pg_shseclabel IN EXCLUSIVE MODE;
+
+ -- Check the node exists
+ SELECT node_sysid AS sysid, node_timeline AS timeline,
+ node_dboid AS dboid INTO remoteid
+ FROM bdr.bdr_nodes
+ WHERE bdr_nodes.node_name = v_node_name;
+
+ IF NOT FOUND THEN
+ RAISE NOTICE 'Node % not found, nothing done', v_node_name;
+ RETURN;
+ END IF;
+
+ -- Check the connection is unidirectional
+ SELECT sysid, timeline, dboid INTO localid
+ FROM bdr.bdr_get_local_nodeid();
+
+ PERFORM 1 FROM bdr_connections
+ WHERE conn_sysid = remoteid.sysid
+ AND conn_timeline = remoteid.timeline
+ AND conn_dboid = remoteid.dboid
+ AND conn_origin_sysid = localid.sysid
+ AND conn_origin_timeline = localid.timeline
+ AND conn_origin_dboid = localid.dboid
+ AND conn_is_unidirectional = 't';
+
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Unidirectional connection to node % not found',
+ v_node_name;
+ RETURN;
+ END IF;
+
+ -- Mark the provider node as killed
+ UPDATE bdr.bdr_nodes
+ SET node_status = 'k'
+ WHERE bdr_nodes.node_name = v_node_name;
+
+ --
+ -- Check what to do with local node based on active connections
+ --
+ -- Note the logic here is:
+ -- - if this is UDR node and there are still live provider nodes, we keep local node active
+ -- - if this is UDR node and all provider nodes were killed, local node will be killed as well
+ -- - if this is UDR + BDR node there will be connection pointing towards local node so we keep it active
+ --
+ PERFORM 1
+ FROM bdr.bdr_connections c
+ JOIN bdr.bdr_nodes n ON (
+ conn_sysid = node_sysid AND
+ conn_timeline = node_timeline AND
+ conn_dboid = node_dboid
+ )
+ WHERE node_status <> 'k';
+
+ IF NOT FOUND THEN
+ UPDATE bdr.bdr_nodes
+ SET node_status = 'k'
+ FROM bdr.bdr_get_local_nodeid()
+ WHERE node_sysid = sysid
+ AND node_timeline = timeline
+ AND node_dboid = dboid;
+ END IF;
+
+ IF drop_slot THEN
+ -- Stop the local apply so that slot on remote site can be dropped
+ -- The apply won't be able to restart because we have bdr_connections
+ -- and bdr_nodes locked exclusively.
+ LOOP
+ v_pid := bdr.bdr_get_apply_pid(remoteid.sysid, remoteid.timeline, remoteid.dboid);
+ IF v_pid IS NULL THEN
+ EXIT;
+ END IF;
+
+ PERFORM pg_terminate_backend(v_pid);
+
+ PERFORM pg_sleep(0.5);
+ END LOOP;
+
+ -- Drop the remote slot
+ PERFORM bdr.bdr_drop_remote_slot(remoteid.sysid, remoteid.timeline, remoteid.dboid);
+ END IF;
+
+ -- Notify local perdb worker to kill nodes.
+ PERFORM bdr.bdr_connections_changed();
+END;
+$body$;
+
+RESET bdr.permit_unsafe_ddl_commands;
+RESET bdr.skip_ddl_replication;
+RESET search_path;
\c regression
+SELECT bdr.bdr_unsubscribe('node-pg');
SELECT bdr.bdr_part_by_node_names(ARRAY['node-pg']);
-- wait till all slots are killed, we need a better way for that.
--- Not implemented
+\c postgres
+SELECT bdr.bdr_unsubscribe('node-pg');
+
+-- wait till all slots are killed, we need a better way for that.
+SELECT pg_sleep(1);
+
+-- the node state for the removed node should show 'k'
+SELECT node_name, node_status FROM bdr.bdr_nodes;
+
+\c regression
+
+-- There should now be zero slots
+SELECT * FROM pg_replication_slots;
+-- Zero active connections
+SELECT count(*) FROM pg_stat_replication;
+
+SELECT node_name, node_status FROM bdr.bdr_nodes;
CREATE EXTENSION bdr VERSION '0.10.0.3';
DROP EXTENSION bdr;
+CREATE EXTENSION bdr VERSION '0.10.0.4';
+DROP EXTENSION bdr;
+
-- evolve version one by one from the oldest to the newest one
CREATE EXTENSION bdr VERSION '0.8.0';
ALTER EXTENSION bdr UPDATE TO '0.8.0.1';
ALTER EXTENSION bdr UPDATE TO '0.10.0.1';
ALTER EXTENSION bdr UPDATE TO '0.10.0.2';
ALTER EXTENSION bdr UPDATE TO '0.10.0.3';
+ALTER EXTENSION bdr UPDATE TO '0.10.0.4';
-- Should never have to do anything: You missed adding the new version above.
ALTER EXTENSION bdr UPDATE;