by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA11295
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id KAA20310 for <
[email protected]>; Fri, 24 Dec 1999 10:39:18 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id KAA20310 for <
[email protected]>; Fri, 24 Dec 1999 10:39:18 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id KAA61760;
Fri, 24 Dec 1999 10:31:13 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id TAA26244
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id TAA12730 for <
[email protected]>; Fri, 24 Dec 1999 19:30:05 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id TAA12730 for <
[email protected]>; Fri, 24 Dec 1999 19:30:05 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id TAA57851;
Fri, 24 Dec 1999 19:23:31 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id WAA02578
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id WAA16641 for <
[email protected]>; Fri, 24 Dec 1999 22:18:56 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id WAA16641 for <
[email protected]>; Fri, 24 Dec 1999 22:18:56 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id WAA89135;
Fri, 24 Dec 1999 22:11:12 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA17976
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id JAA23337 for <
[email protected]>; Sun, 26 Dec 1999 09:28:36 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id JAA23337 for <
[email protected]>; Sun, 26 Dec 1999 09:28:36 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id JAA90738;
Sun, 26 Dec 1999 09:21:58 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA10317
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id IAA02365 for <
[email protected]>; Thu, 30 Dec 1999 08:37:10 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id IAA02365 for <
[email protected]>; Thu, 30 Dec 1999 08:37:10 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id IAA87902;
Thu, 30 Dec 1999 08:34:22 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA16274
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id XAA02655 for <
[email protected]>; Sun, 2 Jan 2000 23:45:55 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id XAA02655 for <
[email protected]>; Sun, 2 Jan 2000 23:45:55 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1])
by hub.org (8.9.3/8.9.3) with ESMTP id XAA13828;
Sun, 2 Jan 2000 23:40:47 -0500 (EST)
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA17522
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
6 $) with ESMTP id LAA01541 for <
[email protected]>; Tue, 4 Jan 2000 11:27:30 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.
7 $) with ESMTP id LAA01541 for <
[email protected]>; Tue, 4 Jan 2000 11:27:30 -0500 (EST)
Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id LAA09992;
Tue, 4 Jan 2000 11:18:07 -0500 (EST)
+ If your life is a hard drive, | 830 Blythe Avenue
+ Christ can be your backup. | Drexel Hill, Pennsylvania 19026
+Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
+ by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA19262
+Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
+ by mail.postgresql.org (8.11.1/8.11.1) with SMTP id eAM5qYs47249;
+ Wed, 22 Nov 2000 00:52:34 -0500 (EST)
+Received: from racerx.cabrion.com (racerx.cabrion.com [166.82.231.4])
+ by mail.postgresql.org (8.11.1/8.11.1) with ESMTP id eAM5lJs46653
+Received: from cabrionhome (gso163-25-211.triad.rr.com [24.163.25.211])
+ by racerx.cabrion.com (8.8.7/8.8.7) with SMTP id AAA13731
+Subject: [GENERAL] Synchronization Toolkit
+Date: Wed, 22 Nov 2000 00:49:29 -0500
+MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="----=_NextPart_000_0062_01C0541E.125CAF30"
+X-Priority: 3
+X-MSMail-Priority: Normal
+X-Mailer: Microsoft Outlook Express 5.50.4133.2400
+X-MimeOLE: Produced By Microsoft MimeOLE V5.50.4133.2400
+Precedence: bulk
+Status: OR
+
+This is a multi-part message in MIME format.
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: text/plain; charset="iso-8859-1"
+Content-Transfer-Encoding: 7bit
+
+Not to be confused with replication, my concept of synchronization is to
+manage changes between a server table (or tables) and one or more mobile,
+disconnected databases (i.e. PalmPilot, laptop, etc.).
+
+I read through the notes in the TODO for this topic and devised a tool kit
+for doing synchronization. I hope that the Postgresql development community
+will find this useful and will help me refine this concept by offering
+insight, experience and some good old fashion hacking if you are so
+inclined.
+
+The bottom of this message describes how to use the attached files.
+
+I look forward to your feedback.
+
+--rob
+
+
+Methodology:
+
+I devised a concept that I call "session versioning". This means that every
+time a row changes it does NOT get a new version. Rather it gets stamped
+with the current session version common to all published tables. Clients,
+when they connect for synchronization, will immediately increment this
+common version number reserve the result as a "post version" and then
+increment the session version again. This version number, implemented as a
+sequence, is common to all synchronized tables and rows.
+
+Any time the server makes changes to the row gets stamped with the current
+session version, when the client posts its changes it uses the reserved
+"post version". The client then makes all it's changes stamping the changed
+rows with it's reserved "post version" rather than the current version. The
+reason why is explained later. It is important that the client post all its
+own changes first so that it does not end up receiving records which changed
+since it's last session that it is about to update anyway.
+
+Reserving the post version is a two step process. First, the number is
+simply stored in a variable for later use. Second, the value is added to a
+lock table (last_stable) to indicate to any concurrent sessions that rows
+with higher version numbers are to be considered "unstable" at the moment
+and they should not attempt to retrieve them at this time. Each client,
+upon connection, will use the lowest value in this lock table (max_version)
+to determine the upper boundary for versions it should retrieve. The lower
+boundary is simply the previous session's "max_version" plus one. Thus
+when the client retrieves changes is uses the following SQL "where"
+expression:
+
+WHERE row_version >= max_version and row_version <= last_stable_version and
+version <> this_post_version
+
+The point of reserving and locking a post version is important in that it
+allows concurrent synchronization by multiple clients. The first, of many,
+clients to connect basically dictates to all future clients that they must
+not take any rows equal to or greater than the one which it just reserved
+and locked. The reason the session version is incremented a second time is
+so that the server may continue to post changes concurrent with any client
+changes and be certain that these concurrent server changes will not taint
+rows the client is about to retrieve. Once the client is finished with it's
+session it removes the lock on it's post version.
+
+Partitioning data for use by each node is the next challenge we face. How
+can we control which "slice" of data each client receives? A slice can be
+horizontal or vertical within a table. Horizontal slices are easy, it's
+just the where clause of an SQL statement that says "give me the rows that
+match X criteria". We handle this by storing and appending a where clause
+to each client's retrieval statement in addition to where clause described
+above. Actually, two where clauses are stored and appended. One is per
+client and one is per publication (table).
+
+We defined horizontal slices by filtering rows. Vertical slices are limits
+by column. The tool kit does provide a mechanism for pseudo vertical
+partitioning. When a client is "subscribed" to a publication, the toolkit
+stores what columns that node is to receive during a session. These are
+stored in the subscribed_cols table. While this does limit the number
+columns transmitted, the insert/update/delete triggers do not recognize
+changes based on columns. The "pseudo" nature of our vertical partitioning
+is evident by example:
+
+Say you have a table with name, address and phone number as columns. You
+restrict a client to see only name and address. This means that phone
+number information will not be sent to the client during synchronization,
+and the client can't attempt to alter the phone number of a given entry.
+Great, but . . . if, on the server, the phone number (but not the name or
+address) is changed, the entire row gets marked with a new version. This
+means that the name and address will get sent to the client even though they
+didn't change.
+
+Well, there's the flaw in vertical partitioning. Other than wasting
+bandwidth, the extra row does no harm to the process. The workaround for
+this is to highly normalize your schema when possible.
+
+Collisions are the next crux one encounters with synchronization. When two
+clients retrieve the same row and both make (different)changes, which one is
+correct? So far the system operates totally independent of time. This is
+good because it doesn't rely on the server or client to keep accurate time.
+We can just ignore time all together, but then we force our clients to
+synchronize on a strict schedule in order to avoid (or reduce) collisions.
+If every node synchronized immediately after making changes we could just
+stop here. Unfortunately this isn't reality. Reality dictates that of two
+clients: Client A & B will each pick up the same record on Monday. A will
+make changes on Monday, then leave for vacation. B will make changes on
+Wednesday because new information was gathered in A's absence. Client B
+posts those changes Wednesday. Meanwhile, client A returns from vacation on
+Friday and synchronizes his changes. A over writes B's changes even though
+A made changes before the most recent information was posted by B.
+
+It is clear that we need some form of time stamp to cope with the above
+example. While clocks aren't the most reliable, they are the only common
+version control available to solve this problem. The system is set up to
+accept (but not require) timestamps from clients and changes on the server
+are time stamped. The system, when presented a time stamp with a row, will
+compare them to figure out who wins in a tie. The system makes certain
+"sanity" checks with regard to these time stamps. A client may not attempt
+to post a change with a timestamp that is more than one hour in the future
+(according to what the server thinks "now" is) nor one hour before it's last
+synchronization date/time. The client row will be immediately placed into
+the collision table if the timestamp is that far out of whack.
+Implementations of the tool kit should take care to ensure that client &
+server agree on what "now" is before attempting to submit changes with
+timestamps.
+
+Time stamps are not required. Should a client be incapable of tracking
+timestamps, etc. The system will assume that any server row which has been
+changed since the client's last session will win a tie. This is quite error
+prone, so timestamps are encouraged where possible.
+
+Inserts pose an interesting challenge. Since multiple clients cannot share
+a sequence (often used as a primary key) while disconnected. They will be
+responsible for their own unique "row_id" when inserting records. Inserts
+accept any arbitrary key, and write back to the client a special kind of
+update that gives the server's row_id. The client is responsible for making
+sure that this update takes place locally.
+
+Deletes are the last portion of the process. When deletes occur, the
+row_id, version, etc. are stored in a "deleted" table. These entries are
+retrieved by the client using the same version filter as described above.
+The table is pruned at the end of each session by deleting all records with
+versions that are less than the lowest 'last_version' stored for each
+client.
+
+Having wrapped up the synchronization process, I'll move on to describe some
+points about managing clients, publications and the like.
+
+The tool kit is split into two objects: SyncManagement and Synchronization.
+The Synchronization object exposes an API that client implementations use to
+communicate and receive changes. The management functions handle system
+install and uninstall in addition to publication of tables and client
+subscriptions.
+
+Installation and uninstallation are handled by their corresponding functions
+in the API. All system tables are prefixed and suffixed with four
+underscores, in hopes that this avoids conflict with an existing tables.
+Calling the install function more than once will generate an error message.
+Uninstall will remove all related tables, sequences, functions and triggers
+from the system.
+
+The first step, after installing the system, is to publish a table. A table
+can be published more than once under different names. Simply provide a
+unique name as the second argument to the publish function. Since object
+names are restricted to 32 characters in Postgres, each table is given a
+unique id and this id is used to create the trigger and sequence names.
+Since one table can be published multiple times, but only needs one set of
+triggers and one sequence for change management a reference count is kept so
+that we know when to add/drop triggers and functions. By default, all
+columns are published, but the third argument to the publish function
+accepts an array reference of column names that allows you to specify a
+limited set. Information about the table is stored in the "tables" table,
+info about the publication is in the "publications" table and column names
+are stored in "subscribed_cols" table.
+
+The next step is to subscribe a client to a table. A client is identified
+by a user name and a node name. The subscribe function takes three
+arguments: user, node & publication. The subscription process writes an
+entry into the "subscribed" table with default values. Of note, the
+"RefreshOnce" attribute is set to true whenever a table is published. This
+indicates to the system that a full table refresh should be sent the next
+time the client connects even if the client requests synchronization rather
+than refresh.
+
+The toolkit does not, yet, provide a way to manage the whereclause stored at
+either the publication or client level. To use or test this feature, you
+will need to set the whereclause attributes manually.
+
+Tables and users can be unpublished and unsubscribed using the corresponding
+functions within the tool kit's management interface. Because postgres
+lacks an "ALTER TABLE DROP COLUMN" function, the unpublish function only
+removes default values and indexes for those columns.
+
+The API isn't the most robust thing in the world right now. All functions
+return undef on success and an error string otherwise (like DBD). I hope to
+clean up the API considerably over the next month. The code has not been
+field tested at this time.
+
+
+The files attached are:
+
+1) SynKit.pm (A perl module that contains install/uninstall functions and a
+simple api for synchronization & management)
+
+2) sync_install.pl (Sample code to demonstrate the installation, publishing
+and subscribe process)
+
+3) sync_uninstall.pl (Sample code to demonstrate the uninstallation,
+unpublishing and unsubscribe process)
+
+
+To use them on Linux (don't know about Win32 but should work fine):
+
+ - set up a test database and make SURE plpgsql is installed
+
+ - install perl 5.05 along with Date::Parse(TimeDate-1.1) , DBI and DBD::Pg
+modules [www.cpan.org]
+
+ - copy all three attached files to a test directory
+
+ - cd to your test directory
+
+ - edit all three files and change the three DBI variables to suit your
+system (they are clearly marked)
+
+ - % perl sync_install.pl
+
+ - check out the tables, functions & triggers installed
+
+ - % perl sync.pl
+
+ - check out the 'sync_test' table, do some updates/inserts/deletes and run
+sync.pl again
+ NOTE: Sanity checks default to allow no more than 50% of the table
+to be changed by the client in a single session.
+ If you delete all (or most of) the rows you will get errors when
+you run sync.pl again! (by design)
+
+ - % perl sync_uninstall.pl (when you are done)
+
+ - check out the sample scripts and the perl module code (commented, but
+not documented)
+
+
+
+
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync.pl"
+
+
+
+# This script depicts the syncronization process for two users.
+
+
+## CHANGE THESE THREE VARIABLE TO MATCH YOUR SYSTEM ###########
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
+my $db_user =3D 'test'; #
+my $db_pass =3D 'test'; #
+#################################################################
+
+my $ret; #holds return value
+
+use SynKit;
+
+#create a synchronization object (pass dbi connection info)
+my $s =3D Synchronize->new($dbi_connect_string,$db_user,$db_pass);
+
+#start a session by passing a user name, "node" identifier and a collision =
+queue name (client or server)
+$ret =3D $s->start_session('JOE','REMOTE_NODE_NAME','server');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this once before attempting to apply individual changes
+$ret =3D $s->start_changes('sync_test',['name']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D $s->apply_change(CLIENTROWID,'insert',undef,['ted']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D $s->apply_change(CLIENTROWID,'insert','1973-11-10 11:25:00 AM -05=
+',['tim']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D $s->apply_change(999,'update',undef,['tom']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D $s->apply_change(1,'update',undef,['tom']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this once after all changes have been submitted
+$ret =3D $s->end_changes();
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to get updates from all subscribed tables
+$ret =3D $s->get_all_updates();
+print "Handle this error: $ret\n\n" if $ret;
+
+print "\n\nSyncronization session is complete. (JOE) \n\n";
+
+
+# make some changes to the database (server perspective)
+
+print "\n\nMaking changes to the the database. (server side) \n\n";
+
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+
+$dbh->do("insert into sync_test values ('roger')");
+$dbh->do("insert into sync_test values ('john')");
+$dbh->do("insert into sync_test values ('harry')");
+$dbh->do("delete from sync_test where name =3D 'roger'");
+$dbh->do("update sync_test set name =3D 'tom' where name =3D 'harry'");
+
+$dbh->disconnect;
+
+
+#now do another session for a different user
+
+#start a session by passing a user name, "node" identifier and a collision =
+queue name (client or server)
+$ret =3D $s->start_session('KEN','ANOTHER_REMOTE_NODE_NAME','server');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to get updates from all subscribed tables
+$ret =3D $s->get_all_updates();
+print "Handle this error: $ret\n\n" if $ret;
+
+print "\n\nSynchronization session is complete. (KEN)\n\n";
+
+print "Now look at your database and see what happend, make changes to the =
+test table, etc. and run this again.\n\n";
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync_uninstall.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync_uninstall.pl"
+
+
+# this script uninstalls the synchronization system using the SyncManager o=
+bject;
+
+use SynKit;
+
+### CHANGE THESE TO MATCH YOUR SYSTEM ########################
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
+my $db_user =3D 'test'; #
+my $db_pass =3D 'test'; #
+#################################################################
+
+
+my $ret; #holds return value
+
+#create an instance of the SyncManager object
+my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
+
+# call this to unsubscribe a user/node (not necessary if you are uninstalli=
+ng)
+print $m->unsubscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
+
+#call this to unpublish a table (not necessary if you are uninstalling)
+print $m->unpublish('sync_test');
+
+#call this to uninstall the syncronization system
+# NOTE: this will automatically unpublish & unsubscribe all users
+print $m->UNINSTALL;
+
+# now let's drop our little test table
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+$dbh->do("drop table sync_test");
+$dbh->disconnect;
+
+print "\n\nI hope you enjoyed this little demonstration\n\n";
+
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync_install.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync_install.pl"
+
+
+# This script shows how to install the synchronization system=20
+# using the SyncManager object
+
+use SynKit;
+
+### CHANGE THESE TO MATCH YOUR SYSTEM ##########################
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
+my $db_user =3D 'test'; #
+my $db_pass =3D 'test'; #
+#################################################################
+my $ret; #holds return value
+
+
+#create an instance of the sync manager object
+my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
+
+#Call this to install the syncronization management tables, etc.
+$ret =3D $m->INSTALL;
+die "Handle this error: $ret\n\n" if $ret;
+
+
+
+#create a test table for us to demonstrate with
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+$dbh->do("create table sync_test (name text)");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->disconnect;
+
+
+
+
+#call this to "publish" a table
+$ret =3D $m->publish('sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to "subscribe" a user/node to a publication (table)
+$ret =3D $m->subscribe('JOE','REMOTE_NODE_NAME','sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to "subscribe" a user/node to a publication (table)
+$ret =3D $m->subscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+
+print "Now you can do: 'perl sync.pl' a few times to play\n\n";
+print "Do 'perl sync_uninstall.pl' to uninstall the system\n";
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="SynKit.pm"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="SynKit.pm"
+
+# Perl DB synchronization toolkit
+
+#created for postgres 7.0.2 +
+use strict;
+
+BEGIN {
+ use vars qw($VERSION);
+ # set the version for version checking
+ $VERSION =3D 1.00;
+}
+
+
+package Synchronize;
+
+use DBI;
+
+use Date::Parse;
+
+# new requires 3 arguments: dbi connection string, plus the corresponding u=
+sername and password to get connected to the database
+sub new {
+ my $proto =3D shift;
+ my $class =3D ref($proto) || $proto;
+ my $self =3D {};
+
+ my $dbi =3D shift;
+ my $user =3D shift;
+ my $pass =3D shift;
+
+ $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
+to database: ".DBI->errstr();
+
+ $self->{user} =3D undef;
+ $self->{node} =3D undef;
+ $self->{status} =3D undef; # holds status of table update portion of sessi=
+on
+ $self->{pubs} =3D {}; #holds hash of pubs available to sessiom with val =
+=3D 1 if ok to request sync
+ $self->{orderpubs} =3D undef; #holds array ref of subscribed pubs ordered =
+by sync_order
+ $self->{this_post_ver} =3D undef; #holds the version number under which th=
+is session will post changes
+ $self->{max_ver} =3D undef; #holds the maximum safe version for getting up=
+dates
+ $self->{current} =3D {}; #holds the current publication info to which chan=
+ges are being applied
+ $self->{queue} =3D 'server'; # tells collide function what to do with coll=
+isions. (default is to hold on server)
+
+ $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
+ ".DBI->errstr();=20
+
+
+ return bless ($self, $class);
+}
+
+sub dblog {=20
+ my $self =3D shift;
+ my $msg =3D $self->{DBLOG}->quote($_[0]);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+ $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
+, message) values($quser, $qnode, now(), $msg)");
+}
+
+
+#start_session establishes session wide information and other housekeeping =
+chores
+ # Accepts username, nodename and queue (client or server) as arguments;
+
+sub start_session {
+ my $self =3D shift;
+ $self->{user} =3D shift || die 'Username is required';
+ $self->{node} =3D shift || die 'Nodename is required';
+ $self->{queue} =3D shift;
+
+
+ if ($self->{queue} ne 'server' && $self->{queue} ne 'client') {
+ die "You must provide a queue argument of either 'server' or 'client'";
+ }
+
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+
+ my $sql =3D "select pubname from ____subscribed____ where username =3D $qu=
+ser and nodename =3D $qnode";
+ my @pubs =3D $self->GetColList($sql);
+
+ return 'User/Node has no subscriptions!' if !defined(@pubs);
+
+ # go though the list and check permissions and rules for each
+ foreach my $pub (@pubs) {
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $sql =3D "select disabled, pubname, fullrefreshonly, refreshonce,post_=
+ver from ____subscribed____ where username =3D $quser and pubname =3D $qpub=
+ and nodename =3D $qnode";
+ my $sth =3D $self->{DBH}->prepare($sql) || die $self->{DBH}->errstr;
+ $sth->execute || die $self->{DBH}->errstr;
+ my @row;
+ while (@row =3D $sth->fetchrow_array) {
+ next if $row[0]; #publication is disabled
+ next if !defined($row[1]); #publication does not exist (should never occ=
+ur)
+ if ($row[2] || $row[3]) { #refresh of refresh once flag is set
+ $self->{pubs}->{$pub} =3D 0; #refresh only
+ next;
+ }
+ if (!defined($row[4])) { #no previous session exists, must refresh
+ $self->{pubs}->{$pub} =3D 0; #refresh only
+ next;
+ }
+ $self->{pubs}->{$pub} =3D 1; #OK for sync
+ }
+ $sth->finish;
+ }
+
+
+ $sql =3D "select pubname from ____publications____ order by sync_order";
+ my @op =3D $self->GetColList($sql);
+ my @orderpubs;
+
+ #loop through ordered pubs and remove non subscribed publications
+ foreach my $pub (@op) {
+ push @orderpubs, $pub if defined($self->{pubs}->{$pub});
+ }
+=09
+ $self->{orderpubs} =3D \@orderpubs;
+
+# Now we obtain a session version number, etc.
+
+ $self->{DBH}->{AutoCommit} =3D 0; #allows "transactions"
+ $self->{DBH}->{RaiseError} =3D 1; #script [or eval] will automatically die=
+ on errors
+
+ eval { #start DB transaction
+
+ #lock the version sequence until we determin that we have gotten
+ #a good value. Lock will be released on commit.
+ $self->{DBH}->do('lock ____version_seq____ in access exclusive mode');
+
+ # remove stale locks if they exist
+ my $sql =3D "delete from ____last_stable____ where username =3D $quser an=
+d nodename =3D $qnode";
+ $self->{DBH}->do($sql);
+
+ # increment version sequence & grab the next val as post_ver
+ my $sql =3D "select nextval('____version_seq____')";
+ my $sth =3D $self->{DBH}->prepare($sql);
+ $sth->execute;
+ ($self->{this_post_ver}) =3D $sth->fetchrow_array();
+ $sth->finish;
+ # grab max_ver from last_stable
+
+ $sql =3D "select min(version) from ____last_stable____";=20
+ $sth =3D $self->{DBH}->prepare($sql);
+ $sth->execute;
+ ($self->{max_ver}) =3D $sth->fetchrow_array();
+ $sth->finish;
+
+ # if there was no version in lock table, then take the ID that was in use
+ # when we started the session ($max_ver -1)
+
+ $self->{max_ver} =3D $self->{this_post_ver} -1 if (!defined($self->{max_v=
+er}));
+
+ # lock post_ver by placing it in last_stable
+ $self->{DBH}->do("insert into ____last_stable____ (version, username, nod=
+ename) values ($self->{this_post_ver}, $quser,$qnode)");
+
+ # increment version sequence again (discard result)
+ $sql =3D "select nextval('____version_seq____')";
+ $sth =3D $self->{DBH}->prepare($sql);
+ $sth->execute;
+ $sth->fetchrow_array();
+ $sth->finish;
+
+ }; #end eval/transaction
+
+ if ($@) { # part of transaction failed
+ return 'Start session failed';
+ $self->{DBH}->rollback;
+ } else { # all's well commit block
+ $self->{DBH}->commit;
+ }
+ $self->{DBH}->{AutoCommit} =3D 1;
+ $self->{DBH}->{RaiseError} =3D 0;
+
+ return undef;
+
+}
+
+#start changes should be called once before applying individual change requ=
+ests
+ # Requires publication and ref to columns that will be updated as arguments
+sub start_changes {
+ my $self =3D shift;
+ my $pub =3D shift || die 'Publication is required';
+ my $colref =3D shift || die 'Reference to column array is required';
+
+ $self->{status} =3D 'starting';
+
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+
+ my @cols =3D @{$colref};
+ my @subcols =3D $self->GetColList("select col_name from ____subscribed_col=
+s____ where username =3D $quser and nodename =3D $qnode and pubname =3D $qp=
+ub");
+ my %subcols;
+ foreach my $col (@subcols) {
+ $subcols{$col} =3D 1;
+ }
+ foreach my $col (@cols) {=09
+ return "User/node is not subscribed to column '$col'" if !$subcols{$col};
+ }
+
+ my $sql =3D "select pubname, readonly, last_session, post_ver, last_ver, w=
+hereclause, sanity_limit,=20
+sanity_delete, sanity_update, sanity_insert from ____subscribed____ where u=
+sername =3D $quser and pubname =3D $qpub and nodename =3D $qnode";
+ my ($junk, $readonly, $last_session, $post_ver, $last_ver, $whereclause, $=
+sanity_limit,=20
+$sanity_delete, $sanity_update, $sanity_insert) =3D $self->GetOneRow($sql);
+=09
+ return 'Publication is read only' if $readonly;
+
+ $sql =3D "select whereclause from ____publications____ where pubname =3D $=
+qpub";
+ my ($wc) =3D $self->GetOneRow($sql);
+ $whereclause =3D '('.$whereclause.')' if $whereclause;
+ $whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
+
+ my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
+__ where pubname =3D $qpub");
+
+ return 'Publication is not registered correctly' if !defined($table);
+
+ my %info;
+ $info{pub} =3D $pub;
+ $info{whereclause} =3D $whereclause;
+ $info{post_ver} =3D $post_ver;
+ $last_session =3D~ s/([+|-]\d\d?)$/ $1/; #put a space before timezone=09
+ $last_session =3D str2time ($last_session); #convert to perltime (seconds =
+since 1970)
+ $info{last_session} =3D $last_session;
+ $info{last_ver} =3D $last_ver;
+ $info{table} =3D $table;
+ $info{cols} =3D \@cols;
+
+ my $sql =3D "select count(oid) from $table";
+ $sql =3D $sql .' '.$whereclause if $whereclause;
+ my ($rowcount) =3D $self->GetOneRow($sql);
+
+ #calculate sanity levels (convert from % to number of rows)
+ # limits defined as less than 1 mean no limit
+ $info{sanitylimit} =3D $rowcount * ($sanity_limit / 100) if $sanity_limit =
+> 0;
+ $info{insertlimit} =3D $rowcount * ($sanity_insert / 100) if $sanity_inser=
+t > 0;
+ $info{updatelimit} =3D $rowcount * ($sanity_update / 100) if $sanity_updat=
+e > 0;
+ $info{deletelimit} =3D $rowcount * ($sanity_delete / 100) if $sanity_delet=
+e > 0;
+
+ $self->{sanitycount} =3D 0;
+ $self->{updatecount} =3D 0;
+ $self->{insertcount} =3D 0;
+ $self->{deletecount} =3D 0;
+
+ $self->{current} =3D \%info;
+
+ $self->{DBH}->{AutoCommit} =3D 0; #turn on transaction behavior so we can =
+roll back on sanity limits, etc.
+
+ $self->{status} =3D 'ready';
+
+ return undef;
+}
+
+#call this once all changes are submitted to commit them;
+sub end_changes {
+ my $self =3D shift;
+ return undef if $self->{status} ne 'ready';
+ $self->{DBH}->commit;
+ $self->{DBH}->{AutoCommit} =3D 1;
+ $self->{status} =3D 'success';
+ return undef;
+}
+
+#call apply_change once for each row level client update
+ # Accepts 4 params: rowid, action, timestamp and reference to data array
+ # Note: timestamp can be undef, data can be undef
+ # timestamp MUST be in perl time (secs since 1970)
+
+#this routine checks basic timestamp info and sanity limits, then passes th=
+e info along to do_action() for processing
+sub apply_change {
+ my $self =3D shift;
+ my $rowid =3D shift || return 'Row ID is required'; #don't die just for on=
+e bad row
+ my $action =3D shift || return 'Action is required'; #don't die just for o=
+ne bad row
+ my $timestamp =3D shift;
+ my $dataref =3D shift;
+ $action =3D lc($action);
+
+ $timestamp =3D str2time($timestamp) if $timestamp;
+
+ return 'Status failure, cannot accept changes: '.$self->{status} if $self-=
+>{status} ne 'ready';
+
+ my %info =3D %{$self->{current}};
+
+ $self->{sanitycount}++;
+ if ($info{sanitylimit} && $self->{sanitycount} > $info{sanitylimit}) {
+ # too many changes from client
+ my $ret =3D $self->sanity('limit');
+ return $ret if $ret;
+ }
+
+=09
+ if ($timestamp && $timestamp > time() + 3600) { # current time + one hour
+ #client's clock is way off, cannot submit changes in future
+ my $ret =3D $self->collide('future', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref, $self->{queue});
+ return $ret if $ret;
+ }
+
+ if ($timestamp && $timestamp < $info{last_session} - 3600) { # last sessio=
+n time less one hour
+ #client's clock is way off, cannot submit changes that occured before las=
+t sync date
+ my $ret =3D $self->collide('past', $info{table}, $rowid, $action, undef, =
+$timestamp, $dataref , $self->{queue});
+ return $ret if $ret;
+ }
+
+ my ($crow, $cver, $ctime); #current row,ver,time
+ if ($action ne 'insert') {
+ my $sql =3D "select ____rowid____, ____rowver____, ____stamp____ from $in=
+fo{table} where ____rowid____ =3D $rowid";
+ ($crow, $cver, $ctime) =3D $self->GetOneRow($sql);
+ if (!defined($crow)) {
+ my $ret =3D $self->collide('norow', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref , $self->{queue});
+ return $ret if $ret;=09=09
+ }
+
+ $ctime =3D~ s/([+|-]\d\d?)$/ $1/; #put space between timezone
+ $ctime =3D str2time($ctime) if $ctime; #convert to perl time
+
+ if ($timestamp) {
+ if ($ctime < $timestamp) {
+ my $ret =3D $self->collide('time', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref, $self->{queue} );=09=09
+ return $ret if $ret;
+ }
+
+ } else {
+ if ($cver > $self->{this_post_ver}) {
+ my $ret =3D $self->collide('version', $info{table}, $rowid, $action, un=
+def, $timestamp, $dataref, $self->{queue} );
+ return $ret if $ret;
+ }
+ }
+=09
+ }
+
+ if ($action eq 'insert') {
+ $self->{insertcount}++;
+ if ($info{insertlimit} && $self->{insertcount} > $info{insertlimit}) {
+ # too many changes from client
+ my $ret =3D $self->sanity('insert');
+ return $ret if $ret;
+ }
+
+ my $qtable =3D $self->{DBH}->quote($info{table});
+ my ($rowidsequence) =3D '_'.$self->GetOneRow("select table_id from ____ta=
+bles____ where tablename =3D $qtable").'__rowid_seq';
+ return 'Table incorrectly registered, cannot get rowid sequence name: '.$=
+self->{DBH}->errstr() if not defined $rowidsequence;
+
+ my @data;
+ foreach my $val (@{$dataref}) {
+ push @data, $self->{DBH}->quote($val);
+ }
+ my $sql =3D "insert into $info{table} (";
+ if ($timestamp) {
+ $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____, ____stamp__=
+__) values (';
+ $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.',\''.local=
+time($timestamp).'\')';
+ } else {
+ $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____) values (';
+ $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.')';
+ }
+ my $ret =3D $self->{DBH}->do($sql);
+ if (!$ret) {
+ my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+ return $ret if $ret;=09=09
+ }
+ my ($newrowid) =3D $self->GetOneRow("select currval('$rowidsequence')");
+ return 'Failed to get current rowid on inserted row'.$self->{DBH}->errstr=
+ if not defined $newrowid;
+ $self->changerowid($rowid, $newrowid);
+ }
+
+ if ($action eq 'update') {
+ $self->{updatecount}++;
+ if ($info{updatelimit} && $self->{updatecount} > $info{updatelimit}) {
+ # too many changes from client
+ my $ret =3D $self->sanity('update');
+ return $ret if $ret;
+ }
+ my @data;
+ foreach my $val (@{$dataref}) {
+ push @data, $self->{DBH}->quote($val);
+ }=09
+
+ my $sql =3D "update $info{table} set ";
+ my @cols =3D @{$info{cols}};
+ foreach my $col (@cols) {
+ my $val =3D shift @data;
+ $sql =3D $sql . "$col =3D $val,";
+ }
+ $sql =3D $sql." ____rowver____ =3D $self->{this_post_ver}";
+ $sql =3D $sql.", ____stamp____ =3D '".localtime($timestamp)."'" if $times=
+tamp;
+ $sql =3D $sql." where ____rowid____ =3D $rowid";
+ $sql =3D $sql." and $info{whereclause}" if $info{whereclause};
+ my $ret =3D $self->{DBH}->do($sql);
+ if (!$ret) {
+ my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+ return $ret if $ret;=09=09
+ }
+
+ }
+
+ if ($action eq 'delete') {
+ $self->{deletecount}++;
+ if ($info{deletelimit} && $self->{deletecount} > $info{deletelimit}) {
+ # too many changes from client
+ my $ret =3D $self->sanity('delete');
+ return $ret if $ret;
+ }
+ if ($timestamp) {
+ my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
+t_ver}, ____stamp____ =3D '".localtime($timestamp)."' where ____rowid____ =
+=3D $rowid";
+ $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+ $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
+}->errstr;
+ } else {
+ my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
+t_ver} where ____rowid____ =3D $rowid";
+ $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+ $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
+}->errstr;
+ }
+ my $sql =3D "delete from $info{table} where ____rowid____ =3D $rowid";
+ $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+ my $ret =3D $self->{DBH}->do($sql);
+ if (!$ret) {
+ my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+ return $ret if $ret;=09=09
+ }
+}
+=09
+=09
+ return undef;
+}
+
+sub changerowid {
+ my $self =3D shift;
+ my $oldid =3D shift;
+ my $newid =3D shift;
+ $self->writeclient('changeid',"$oldid\t$newid");
+}
+
+#writes info to client
+sub writeclient {
+ my $self =3D shift;
+ my $type =3D shift;
+ my @info =3D @_;
+ print "$type: ",join("\t",@info),"\n";
+ return undef;
+}
+
+# Override this for custom behavior. Default is to echo back the sanity fa=
+ilure reason.=20=20
+# If you want to override a collision, you can do so by returning undef.
+sub sanity {
+ my $self =3D shift;
+ my $reason =3D shift;
+ $self->{status} =3D 'sanity exceeded';
+ $self->{DBH}->rollback;
+ return $reason;
+}
+
+# Override this for custom behavior. Default is to echo back the failure r=
+eason.=20=20
+# If you want to override a collision, you can do so by returning undef.
+sub collide {
+ my $self =3D shift;
+ my ($reason,$table,$rowid,$action,$rowver,$timestamp,$data, $queue) =3D @_;
+
+ my @data;
+ foreach my $val (@{$data}) {
+ push @data, $self->{DBH}->quote($val);
+ }=09
+
+ if ($reason =3D~ /integrity/i || $reason =3D~ /constraint/i) {
+ $self->{status} =3D 'intergrity violation';
+ $self->{DBH}->rollback;
+ }
+
+ my $datastring;
+ my @cols =3D @{$self->{current}->{cols}};
+ foreach my $col (@cols) {
+ my $val =3D shift @data;
+ $datastring =3D $datastring . "$col =3D $val,";
+ }
+ chop $datastring; #remove trailing comma
+
+ if ($queue eq 'server') {
+ $timestamp =3D localtime($timestamp) if defined($timestamp);
+ $rowid =3D $self->{DBH}->quote($rowid);
+ $rowid =3D 'null' if !defined($rowid);
+ $rowver =3D 'null' if !defined($rowver);
+ $timestamp =3D $self->{DBH}->quote($timestamp);
+ $data =3D $self->{DBH}->quote($data);
+ my $qtable =3D $self->{DBH}->quote($table);
+ my $qreason =3D $self->{DBH}->quote($reason);
+ my $qaction =3D $self->{DBH}->quote($action);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+ $datastring =3D $self->{DBH}->quote($datastring);
+
+
+ my $sql =3D "insert into ____collision____ (rowid,
+tablename, rowver, stamp, data, reason, action, username,
+nodename, queue) values($rowid,$qtable, $rowver, $timestamp,$datastring,
+$qreason, $qaction,$quser, $qnode)";
+ $self->{DBH}->do($sql) || die 'Failed to write to collision table: '.$sel=
+f->{DBH}->errstr;
+
+ } else {
+
+ $self->writeclient('collision',$rowid,$table, $rowver, $timestamp,$reason=
+, $action,$self->{user}, $self->{node}, $data);
+
+ }
+ return $reason;
+}
+
+#calls get_updates once for each publication the user/node is subscribed to=
+ in correct sync_order
+sub get_all_updates {
+ my $self =3D shift;
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+
+ foreach my $pub (@{$self->{orderpubs}}) {
+ $self->get_updates($pub, 1); #request update as sync unless overrridden b=
+y flags
+ }
+
+}
+
+# Call this once for each table the client needs refreshed or sync'ed AFTER=
+ all inbound client changes have been posted
+# Accepts publication and sync flag as arguments
+sub get_updates {
+ my $self =3D shift;
+ my $pub =3D shift || die 'Publication is required';
+ my $sync =3D shift;
+
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+
+ #enforce refresh and refreshonce flags
+ undef $sync if !$self->{pubs}->{$pub};=20
+
+
+ my %info =3D $self->{current};
+
+ my @cols =3D $self->GetColList("select col_name from ____subscribed_cols__=
+__ where username =3D $quser and nodename =3D $qnode and pubname =3D $qpub"=
+);;
+
+ my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
+__ where pubname =3D $qpub");
+ return 'Table incorrectly registered for read' if !defined($table);
+ my $qtable =3D $self->{DBH}->quote($table);=09
+
+
+ my $sql =3D "select pubname, last_session, post_ver, last_ver, whereclause=
+ from ____subscribed____ where username =3D $quser and pubname =3D $qpub an=
+d nodename =3D $qnode";
+ my ($junk, $last_session, $post_ver, $last_ver, $whereclause) =3D $self->G=
+etOneRow($sql);
+
+ my ($wc) =3D $self->GetOneRow("select whereclause from ____publications___=
+_ where pubname =3D $qpub");
+
+ $whereclause =3D '('.$whereclause.')' if $whereclause;
+
+ $whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
+
+
+ if ($sync) {
+ $self->writeclient('start synchronize', $pub);
+ } else {
+ $self->writeclient('start refresh', $pub);
+ $self->{DBH}->do("update ____subscribed____ set refreshonce =3D false whe=
+re pubname =3D $qpub and username =3D $quser and nodename =3D $qnode") || r=
+eturn 'Failed to clear RefreshOnce flag: '.$self->{DBH}->errstr;
+ }
+
+ $self->writeclient('columns',@cols);
+
+
+
+ my $sql =3D "select ____rowid____, ".join(',', @cols)." from $table";
+ if ($sync) {
+ $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver} and ____rowve=
+r____ > $last_ver)";
+ if (defined($self->{this_post_ver})) {
+ $sql =3D $sql . " and (____rowver____ <> $post_ver)";
+ }
+ } else {
+ $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver})";
+ }
+ $sql =3D $sql." and $whereclause" if $whereclause;
+=09
+ my $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare S=
+QL for updates: '.$self->{DBH}->errstr;
+ $sth->execute || return 'Failed to execute SQL for updates: '.$self->{DBH}=
+->errstr;
+ my @row;
+ while (@row =3D $sth->fetchrow_array) {
+ $self->writeclient('update/insert',@row);
+ }
+
+ $sth->finish;
+
+ # now get deleted rows
+ if ($sync) {
+ $sql =3D "select rowid from ____deleted____ where (tablename =3D $qtable)=
+";
+ $sql =3D $sql." and (rowver <=3D $self->{max_ver} and rowver > $last_ver)=
+";
+ if (defined($self->{this_post_ver})) {
+ $sql =3D $sql . " and (rowver <> $self->{this_post_ver})";
+ }
+ $sql =3D $sql." and $whereclause" if $whereclause;
+
+ $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL=
+ for deletes: '.$self->{DBH}->errstr;
+ $sth->execute || return 'Failed to execute SQL for deletes: '.$self->{DBH=
+}->errstr;
+ my @row;
+ while (@row =3D $sth->fetchrow_array) {
+ $self->writeclient('delete',@row);
+ }
+
+ $sth->finish;
+ }
+
+ if ($sync) {
+ $self->writeclient('end synchronize', $pub);
+ } else {
+ $self->writeclient('end refresh', $pub);
+ }
+
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+
+ $self->{DBH}->do("update ____subscribed____ set last_ver =3D $self->{max_v=
+er}, last_session =3D now(), post_ver =3D $self->{this_post_ver} where user=
+name =3D $quser and nodename =3D $qnode and pubname =3D $qpub");
+ return undef;
+}
+
+
+# Call this once when everything else is done. Does housekeeping.=20
+# (MAKE THIS AN OBJECT DESTRUCTOR?)
+sub DESTROY {
+ my $self =3D shift;
+
+#release version from lock table (including old ones)
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+ my $sql =3D "delete from ____last_stable____ where username =3D $quser and=
+ nodename =3D $qnode";
+ $self->{DBH}->do($sql);
+
+#clean up deleted table
+ my ($version) =3D $self->GetOneRow("select min(last_ver) from ____subscrib=
+ed____");
+ return undef if not defined $version;
+ $self->{DBH}->do("delete from ____deleted____ where rowver < $version") ||=
+ return 'Failed to prune deleted table'.$self->{DBH}->errstr;;
+
+
+#disconnect from DBD sessions
+ $self->{DBH}->disconnect;
+ $self->{DBLOG}->disconnect;
+ return undef;
+}
+
+############# Helper Subs ############
+sub GetColList {
+ my $self =3D shift;
+ my $sql =3D shift || die 'Must provide sql select statement';
+ my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+ $sth->execute || return undef;
+ my $val;
+ my @col;
+ while (($val) =3D $sth->fetchrow_array) {
+ push @col, $val;
+ }
+ $sth->finish;
+ return @col;
+}
+
+sub GetOneRow {
+ my $self =3D shift;
+ my $sql =3D shift || die 'Must provide sql select statement';
+ my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+ $sth->execute || return undef;
+ my @row =3D $sth->fetchrow_array;
+ $sth->finish;
+ return @row;
+}
+
+=20
+
+
+
+package SyncManager;
+
+use DBI;
+# new requires 3 arguments: dbi connection string, plus the corresponding u=
+sername and password
+
+sub new {
+ my $proto =3D shift;
+ my $class =3D ref($proto) || $proto;
+ my $self =3D {};
+
+ my $dbi =3D shift;
+ my $user =3D shift;
+ my $pass =3D shift;
+
+ $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
+to database: ".DBI->errstr();
+
+ $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
+ ".DBI->errstr();
+=09
+ return bless ($self, $class);
+}
+
+sub dblog {=20
+ my $self =3D shift;
+ my $msg =3D $self->{DBLOG}->quote($_[0]);
+ my $quser =3D $self->{DBH}->quote($self->{user});
+ my $qnode =3D $self->{DBH}->quote($self->{node});
+ $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
+, message) values($quser, $qnode, now(), $msg)");
+}
+
+#this should never need to be called, but it might if a node bails without =
+releasing their locks
+sub ReleaseAllLocks {
+ my $self =3D shift;
+ $self->{DBH}->do("delete from ____last_stable____)");
+}
+# Adds a publication to the system. Also adds triggers, sequences, etc ass=
+ociated with the table if approproate.
+ # accepts two argument: the name of a physical table and the name under wh=
+ich to publish it=20
+ # NOTE: the publication name is optional and will default to the table na=
+me if not supplied
+ # returns undef if ok, else error string;
+sub publish {
+ my $self =3D shift;
+ my $table =3D shift || die 'You must provide a table name (and optionally =
+a unique publication name)';
+ my $pub =3D shift;
+ $pub =3D $table if not defined($pub);
+
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+ my ($junk) =3D $self->GetOneRow($sql);
+ return 'Publication already exists' if defined($junk);
+
+ my $qtable =3D $self->{DBH}->quote($table);
+
+ $sql =3D "select table_id, refcount from ____tables____ where tablename =
+=3D $qtable";
+ my ($id, $refcount) =3D $self->GetOneRow($sql);
+
+ if(!defined($id)) {
+ $self->{DBH}->do("insert into ____tables____ (tablename, refcount) values=
+ ($qtable,1)") || return 'Failed to register table: ' . $self->{DBH}->errst=
+r;
+ my $sql =3D "select table_id from ____tables____ where tablename =3D $qta=
+ble";
+ ($id) =3D $self->GetOneRow($sql);
+ }
+
+ if (defined($refcount)) {
+ $self->{DBH}->do("update ____tables____ set refcount =3D refcount+1 where=
+ table_id =3D $id") || return 'Failed to update refrence count: ' . $self->=
+{DBH}->errstr;
+ } else {
+=09=09
+ $id =3D '_'.$id.'_';=20
+
+ my @cols =3D $self->GetTableCols($table, 1); # 1 =3D get hidden cols too
+ my %skip;
+ foreach my $col (@cols) {
+ $skip{$col} =3D 1;
+ }
+=09=09
+ if (!$skip{____rowver____}) {
+ $self->{DBH}->do("alter table $table add column ____rowver____ int4"); #=
+don't fail here in case table is being republished, just accept the error s=
+ilently
+ }
+ $self->{DBH}->do("update $table set ____rowver____ =3D ____version_seq___=
+_.last_value - 1") || return 'Failed to initialize rowver: ' . $self->{DBH}=
+->errstr;
+
+ if (!$skip{____rowid____}) {
+ $self->{DBH}->do("alter table $table add column ____rowid____ int4"); #d=
+on't fail here in case table is being republished, just accept the error si=
+lently
+ }
+
+ my $index =3D $id.'____rowid____idx';
+ $self->{DBH}->do("create index $index on $table(____rowid____)") || retur=
+n 'Failed to create rowid index: ' . $self->{DBH}->errstr;
+
+ my $sequence =3D $id.'_rowid_seq';
+ $self->{DBH}->do("create sequence $sequence") || return 'Failed to create=
+ rowver sequence: ' . $self->{DBH}->errstr;
+
+ $self->{DBH}->do("alter table $table alter column ____rowid____ set defau=
+lt nextval('$sequence')"); #don't fail here in case table is being republis=
+hed, just accept the error silently
+
+ $self->{DBH}->do("update $table set ____rowid____ =3D nextval('$sequence=
+')") || return 'Failed to initialize rowid: ' . $self->{DBH}->errstr;
+
+ if (!$skip{____stamp____}) {
+ $self->{DBH}->do("alter table $table add column ____stamp____ timestamp"=
+); #don't fail here in case table is being republished, just accept the err=
+or silently
+ }
+
+ $self->{DBH}->do("update $table set ____stamp____ =3D now()") || return =
+'Failed to initialize stamp: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_ver_ins';
+ $self->{DBH}->do("create trigger $trigger before insert on $table for eac=
+h row execute procedure sync_insert_ver()") || return 'Failed to create tri=
+gger: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_ver_upd';
+ $self->{DBH}->do("create trigger $trigger before update on $table for eac=
+h row execute procedure sync_update_ver()") || return 'Failed to create tri=
+gger: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_del_row';
+ $self->{DBH}->do("create trigger $trigger after delete on $table for each=
+ row execute procedure sync_delete_row()") || return 'Failed to create trig=
+ger: ' . $self->{DBH}->errstr;
+ }
+
+ $self->{DBH}->do("insert into ____publications____ (pubname, tablename) va=
+lues ('$pub','$table')") || return 'Failed to create publication entry: '.$=
+self->{DBH}->errstr;
+
+ return undef;
+}
+
+
+# Removes a publication from the system. Also drops triggers, sequences, e=
+tc associated with the table if approproate.
+ # accepts one argument: the name of a publication
+ # returns undef if ok, else error string;
+sub unpublish {
+ my $self =3D shift;
+ my $pub =3D shift || return 'You must provide a publication name';
+ my $qpub =3D $self->{DBH}->quote($pub);
+ my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+ my ($table) =3D $self->GetOneRow($sql);
+ return 'Publication does not exist' if !defined($table);
+
+ my $qtable =3D $self->{DBH}->quote($table);
+
+ $sql =3D "select table_id, refcount from ____tables____ where tablename =
+=3D $qtable";
+ my ($id, $refcount) =3D $self->GetOneRow($sql);
+ return 'Table: $table is not correctly registered!' if not defined($id);
+
+ $self->{DBH}->do("update ____tables____ set refcount =3D refcount -1 where=
+ tablename =3D $qtable") || return 'Failed to decrement reference count: ' =
+. $self->{DBH}->errstr;
+
+ $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub")=
+ || return 'Failed to delete user subscriptions: ' . $self->{DBH}->errstr;
+ $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
+pub") || return 'Failed to delete subscribed columns: ' . $self->{DBH}->err=
+str;
+ $self->{DBH}->do("delete from ____publications____ where tablename =3D $qt=
+able and pubname =3D $qpub") || return 'Failed to delete from publications:=
+ ' . $self->{DBH}->errstr;
+
+ #if this is the last reference, we want to drop triggers, etc;
+ if ($refcount <=3D 1) {
+ $id =3D "_".$id."_";
+
+ $self->{DBH}->do("alter table $table alter column ____rowver____ drop def=
+ault") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+ $self->{DBH}->do("alter table $table alter column ____rowid____ drop defa=
+ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+ $self->{DBH}->do("alter table $table alter column ____stamp____ drop defa=
+ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_ver_upd';
+ $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_ver_ins';
+ $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+ my $trigger =3D $id.'_del_row';
+ $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+ my $sequence =3D $id.'_rowid_seq';
+ $self->{DBH}->do("drop sequence $sequence") || return 'Failed to drop seq=
+uence: ' . $self->{DBH}->errstr;
+
+ my $index =3D $id.'____rowid____idx';
+ $self->{DBH}->do("drop index $index") || return 'Failed to drop index: ' =
+. $self->{DBH}->errstr;
+ $self->{DBH}->do("delete from ____tables____ where tablename =3D $qtable"=
+) || return 'remove entry from tables: ' . $self->{DBH}->errstr;
+ }
+return undef;
+}
+
+
+
+
+
+#Subscribe user/node to a publication
+ # Accepts 3 arguements: Username, Nodename, Publication
+ # NOTE: the remaining arguments can be supplied as column names to which =
+the user/node should be subscribed
+ # Return undef if ok, else returns an error string
+
+sub subscribe {
+ my $self =3D shift;
+ my $user =3D shift || die 'You must provide user, node and publication as =
+arguments';
+ my $node =3D shift || die 'You must provide user, node and publication as =
+arguments';
+ my $pub =3D shift || die 'You must provide user, node and publication as a=
+rguments';
+ my @cols =3D @_;
+
+ my $quser =3D $self->{DBH}->quote($user);
+ my $qnode =3D $self->{DBH}->quote($node);
+ my $qpub =3D $self->{DBH}->quote($pub);
+
+ my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+ my ($table) =3D $self->GetOneRow($sql);
+ return "Publication $pub does not exist." if not defined $table;
+ my $qtable =3D $self->{DBH}->quote($table);
+
+ @cols =3D $self->GetTableCols($table) if !@cols; # get defaults if cols we=
+re not spefified by caller
+
+ $self->{DBH}->do("insert into ____subscribed____ (username, nodename,pubna=
+me,last_ver,refreshonce) values('$user', '$node','$pub',0, true)") || retur=
+n 'Failes to create subscription: ' . $self->{DBH}->errstr;=09
+
+ foreach my $col (@cols) {
+ $self->{DBH}->do("insert into ____subscribed_cols____ (username, nodename=
+, pubname, col_name) values ('$user','$node','$pub','$col')") || return 'Fa=
+iles to subscribe column: ' . $self->{DBH}->errstr;=09
+ }
+
+ return undef;
+}
+
+
+#Unsubscribe user/node to a publication
+ # Accepts 3 arguements: Username, Nodename, Publication
+ # Return undef if ok, else returns an error string
+
+sub unsubscribe {
+ my $self =3D shift;
+ my $user =3D shift || die 'You must provide user, node and publication as =
+arguments';
+ my $node =3D shift || die 'You must provide user, node and publication as =
+arguments';
+ my $pub =3D shift || die 'You must provide user, node and publication as a=
+rguments';
+ my @cols =3D @_;
+
+ my $quser =3D $self->{DBH}->quote($user);
+ my $qnode =3D $self->{DBH}->quote($node);
+ my $qpub =3D $self->{DBH}->quote($pub);
+
+ my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+ my $table =3D $self->GetOneRow($sql);
+ return "Publication $pub does not exist." if not defined $table;
+
+ $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
+pub and username =3D $quser and nodename =3D $qnode") || return 'Failed to =
+remove column subscription: '. $self->{DBH}->errstr;
+ $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub a=
+nd username =3D $quser and nodename =3D $qnode") || return 'Failed to remov=
+e subscription: '. $self->{DBH}->errstr;
+
+
+ return undef;
+}
+
+
+
+#INSTALL creates the necessary management tables.=20=20
+ #returns undef if everything is ok, else returns a string describing the e=
+rror;
+sub INSTALL {
+my $self =3D shift;
+
+#check to see if management tables are already installed
+
+my ($test) =3D $self->GetOneRow("select * from pg_class where relname =3D '=
+____publications____'");
+if (defined($test)) {
+ return 'It appears that synchronization manangement tables are already ins=
+talled here. Please uninstall before reinstalling.';
+};
+
+
+
+#install the management tables, etc.
+
+$self->{DBH}->do("create table ____publications____ (pubname text primary k=
+ey,description text, tablename text, sync_order int4, whereclause text)") |=
+| return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____subscribed_cols____ (nodename text, user=
+name text, pubname text, col_name text, description text, primary key(noden=
+ame, username, pubname,col_name))") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____subscribed____ (nodename text, username =
+text, pubname text, last_session timestamp, post_ver int4, last_ver int4, w=
+hereclause text, sanity_limit int4 default 0, sanity_delete int4 default 0,=
+ sanity_update int4 default 0, sanity_insert int4 default 50, readonly bool=
+ean, disabled boolean, fullrefreshonly boolean, refreshonce boolean, primar=
+y key(nodename, username, pubname))") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____last_stable____ (version int4, username =
+text, nodename text, primary key(version, username, nodename))") || return =
+$self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____tables____ (tablename text, table_id int=
+4, refcount int4, primary key(tablename, table_id))") || return $self->{DBH=
+}->errstr();
+
+$self->{DBH}->do("create sequence ____table_id_seq____") || return $self->{=
+DBH}->errstr();
+
+$self->{DBH}->do("alter table ____tables____ alter column table_id set defa=
+ult nextval('____table_id_seq____')") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____deleted____ (rowid int4, tablename text,=
+ rowver int4, stamp timestamp, primary key (rowid, tablename))") || return =
+$self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____collision____ (rowid text, tablename tex=
+t, rowver int4, stamp timestamp, faildate timestamp default now(),data text=
+,reason text, action text, username text, nodename text,queue text)") || re=
+turn $self->{DBH}->errstr();
+
+$self->{DBH}->do("create sequence ____version_seq____") || return $self->{D=
+BH}->errstr();
+
+$self->{DBH}->do("create table ____sync_log____ (username text, nodename te=
+xt, stamp timestamp, message text)") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create function sync_insert_ver() returns opaque as
+'begin
+if new.____rowver____ isnull then
+new.____rowver____ :=3D ____version_seq____.last_value;
+end if;
+if new.____stamp____ isnull then
+new.____stamp____ :=3D now();
+end if;
+return NEW;
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create function sync_update_ver() returns opaque as
+'begin
+if new.____rowver____ =3D old.____rowver____ then
+new.____rowver____ :=3D ____version_seq____.last_value;
+end if;
+if new.____stamp____ =3D old.____stamp____ then
+new.____stamp____ :=3D now();
+end if;
+return NEW;
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+
+$self->{DBH}->do("create function sync_delete_row() returns opaque as=20
+'begin=20
+insert into ____deleted____ (rowid,tablename,rowver,stamp) values
+(old.____rowid____, TG_RELNAME, old.____rowver____,old.____stamp____);=20
+return old;=20
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+return undef;
+}
+
+#removes all management tables & related stuff
+ #returns undef if ok, else returns an error message as a string
+sub UNINSTALL {
+my $self =3D shift;
+
+#Make sure all tables are unpublished first
+my $sth =3D $self->{DBH}->prepare("select pubname from ____publications____=
+");
+$sth->execute;
+my $pub;
+while (($pub) =3D $sth->fetchrow_array) {
+ $self->unpublish($pub);=09
+}
+$sth->finish;
+
+$self->{DBH}->do("drop table ____publications____") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop table ____subscribed_cols____") || return $self->{DB=
+H}->errstr();
+$self->{DBH}->do("drop table ____subscribed____") || return $self->{DBH}->e=
+rrstr();
+$self->{DBH}->do("drop table ____last_stable____") || return $self->{DBH}->=
+errstr();
+$self->{DBH}->do("drop table ____deleted____") || return $self->{DBH}->errs=
+tr();
+$self->{DBH}->do("drop table ____collision____") || return $self->{DBH}->er=
+rstr();
+$self->{DBH}->do("drop table ____tables____") || return $self->{DBH}->errst=
+r();
+$self->{DBH}->do("drop table ____sync_log____") || return $self->{DBH}->err=
+str();
+
+$self->{DBH}->do("drop sequence ____table_id_seq____") || return $self->{DB=
+H}->errstr();
+$self->{DBH}->do("drop sequence ____version_seq____") || return $self->{DBH=
+}->errstr();
+
+$self->{DBH}->do("drop function sync_insert_ver()") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop function sync_update_ver()") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop function sync_delete_row()") || return $self->{DBH}-=
+>errstr();
+
+return undef;
+
+}
+
+sub DESTROY {
+ my $self =3D shift;
+
+ $self->{DBH}->disconnect;
+ $self->{DBLOG}->disconnect;
+ return undef;
+}
+
+############# Helper Subs ############
+
+sub GetOneRow {
+ my $self =3D shift;
+ my $sql =3D shift || die 'Must provide sql select statement';
+ my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+ $sth->execute || return undef;
+ my @row =3D $sth->fetchrow_array;
+ $sth->finish;
+ return @row;
+}
+
+#call this with second non-zero value to get hidden columns
+sub GetTableCols {
+ my $self =3D shift;
+ my $table =3D shift || die 'Must provide table name';
+ my $wanthidden =3D shift;
+ my $sql =3D "select * from $table where 0 =3D 1";
+ my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+ $sth->execute || return undef;
+ my @row =3D @{$sth->{NAME}};
+ $sth->finish;
+ return @row if $wanthidden;
+ my @cols;
+ foreach my $col (@row) {
+ next if $col eq '____rowver____';
+ next if $col eq '____stamp____';
+ next if $col eq '____rowid____';
+ push @cols, $col;=09
+ }
+ return @cols;
+}
+
+
+1; #happy require
+
+------=_NextPart_000_0062_01C0541E.125CAF30--
+
+