Skip to content
This repository was archived by the owner on Mar 25, 2025. It is now read-only.

Commit b68e1d8

Browse files
author
Christopher Merrick
committed
docs, keep track of logfile, pass server id as a param
1 parent ec2e2ce commit b68e1d8

File tree

3 files changed

+95
-13
lines changed

3 files changed

+95
-13
lines changed

README.md

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,90 @@ Credit goes to the folks at LinkedIn for pioneering this approach - read Jay Kre
1616
### todo
1717
- listeners for PostgreSQL, MongoDB, (others)
1818
- stronger schema-ing of data
19+
- process for capturing an initial state of the data
1920

21+
### setup for mysql
2022

21-
### usage
23+
How to use plainview to listen to your MySQL replication stream and write raw change data to Amazon S3.
24+
25+
##### 1. Setup your MySQL Server
26+
Your server needs to be configured as a replication master using the "row" binary log format. Set
27+
the following in your server's my.cnf file:
28+
29+
```
30+
log-bin=mysql-bin
31+
binlog_format=row
32+
server-id=1234
33+
```
34+
35+
Where `log-bin` specifies the name to use to store binary log files, and `server-id` is a unique number to identify this server in the replication system. Restart your mysql server to put the changes into effect.
36+
37+
Finally, create a MySQL replication user:
38+
39+
```sql
40+
mysql> GRANT REPLICATION SLAVE ON *.* TO '<repl-user>'@'192.168.%' IDENTIFIED BY '<repl-pass>';
41+
```
42+
43+
Modify with an appropriate username and password, and replace `192.168.%` with the network address or subnet that you'll be connecting from. Finally, grab the current position of the replication log:
2244

2345
```bash
24-
lein run -m plainview.producer -f mysql-bin.000001 -n 8779 -P 5001 -u replication-user -p password -s kinesis-stream-name
25-
lein run -m plainview.consumer -a app-name -b bucket-name -s kinesis-stream-name
46+
mysql> show master status;
47+
+------------------+-----------+--------------+------------------+
48+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
49+
+------------------+-----------+--------------+------------------+
50+
| mysql-bin.000019 | 480798760 | | |
51+
+------------------+-----------+--------------+------------------+
2652
```
2753

28-
in your my.cnf:
54+
##### 2. Setup a Kinesis Stream
55+
56+
(TODO)
57+
58+
##### 3. Run the plainview producer
59+
60+
plainview's producer listens to your MySQL server's replication system and forwards data to the Kinesis stream you just setup.
61+
62+
Clone this repository and run the following in your shell to give plainview access to the Kinesis stream you just setup:
2963

3064
```
31-
server-id = 1234
32-
log-bin=mysql-bin
33-
binlog_format=row
65+
export AWS_ACCESS_KEY=<your-key>
66+
export AWS_SECRET_KEY=<your-secret>
3467
```
68+
69+
The key will need full permissions on Kinesis. From the same shell, run the producer
70+
71+
```bash
72+
lein run -m plainview.producer -i <server-id> -f <mysql-file> -n <mysql-position> -P <mysql-port> -u <repl-user> -p <repl-pass> -s <kinesis-stream>
73+
```
74+
75+
Where
76+
- `<server-id>` is the unique ID from your master MySQL server's my.cnf file
77+
- `<mysql-file>` is the "File" that your master MySQL server reported in Step 1. (in this case, `mysql-bin.000019`)
78+
- `<mysql-position>` is the "Position" that your master MySQL server reported in Step 1. (in this case, `480798760`)
79+
- `<mysql-port>` is the port of your master MySQL server
80+
- `<repl-user>` and `<repl-pass>` are the credentials you GRANTed permission to in Step 1.
81+
- `<kinesis-stream>` is the name of the Kinesis stream from Step 2.
82+
83+
The producer will run continuously, listening for updates coming through replication.
84+
85+
##### 4. Run the plainview consumer
86+
87+
plainview's consumer listens to a Kinesis stream for data from the producer and writes it to Amazon S3.
88+
89+
Again run the following to give the code access to Amazon resources:
90+
91+
```
92+
export AWS_ACCESS_KEY=<your-key>
93+
export AWS_SECRET_KEY=<your-secret>
94+
```
95+
96+
This key will need read permission on Kinesis, full permission on DynamoDB, and permission to write to an S3 bucket. From the same shell, run the consumer:
97+
98+
```bash
99+
lein run -m plainview.consumer -a <kinesis-app> -b <s3-bucket> -s <kinesis-stream>
100+
```
101+
102+
Where
103+
- `<kinesis-app>` is a unique name for this consumer. Kinesis uses this name to maintain the consumer's state.
104+
- `<s3-bucket>` is the S3 bucket to write to
105+
- `<kinesis-stream>` is the name fo the Kinesis stream from Step 2.

src/clj/plainview/consumer.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@
6969
(nil? (:app options)) (exit 1 "An application name must be specified")
7070
(nil? (:bucket options)) (exit 1 "A bucket name must be specified")
7171
(nil? (:stream options)) (exit 1 "A kinesis stream name must be specified"))
72+
(s3/create-bucket (:bucket options))
7273
(create-worker options)))

src/clj/plainview/producer.clj

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77
(:import [com.google.code.or OpenReplicator]
88
[com.google.code.or.binlog BinlogEventListener]
99
[com.google.code.or.binlog.impl.event WriteRowsEvent UpdateRowsEvent
10-
TableMapEvent QueryEvent DeleteRowsEvent AbstractRowEvent])
10+
TableMapEvent QueryEvent DeleteRowsEvent AbstractRowEvent RotateEvent])
1111
(:gen-class))
1212

13+
;; careful about using atoms if we ever use multiple threads
14+
(def table-map (atom {}))
15+
(def log-file (atom nil))
16+
1317
(defn- string->buff [s]
1418
(-> (.getBytes s "utf-8")
1519
(java.nio.ByteBuffer/wrap)))
@@ -20,7 +24,6 @@
2024
stream
2125
(string->buff (generate-string event)) tableid))) ;; probably shouldn't use tableid as partition key
2226

23-
(def table-map (atom {}))
2427

2528
(defn query-table-map [tableid]
2629
(get @table-map tableid {:database "_unknown" :table "_unknown"}))
@@ -31,6 +34,9 @@
3134
(swap! table-map #(assoc %
3235
(.getTableId e)
3336
{:database (coerce (.getDatabaseName e)) :table (coerce (.getTableName e))})))
37+
(defmethod pre-parse-event RotateEvent
38+
[e]
39+
(reset! log-file (coerce (.getBinlogFileName e))))
3440

3541
(defmethod pre-parse-event :default
3642
[e]
@@ -48,7 +54,7 @@
4854
(map #(map coerce (.getColumns %)) (.getRows e)))
4955
(defmethod parse-event-data :default
5056
[e]
51-
(println (str e "\n"))
57+
(println (str "In file " @log-file ": " e "\n"))
5258
{})
5359

5460
(defmulti parse-meta-data class)
@@ -79,13 +85,13 @@
7985

8086
(defn replicator
8187
"get a replicator"
82-
[{:keys [username password host port filename position] :as opts} listener]
88+
[{:keys [username password host port filename position server-id] :as opts} listener]
8389
(doto (OpenReplicator.)
8490
(.setUser username)
8591
(.setPassword password)
8692
(.setHost host)
8793
(.setPort port)
88-
(.setServerId 1234)
94+
(.setServerId server-id)
8995
(.setBinlogFileName filename)
9096
(.setBinlogPosition position)
9197
(.setBinlogEventListener listener)))
@@ -102,6 +108,8 @@
102108
["-f" "--filename FILENAME" "Binlog filename"]
103109
["-n" "--position POSITION" "Binlog position"
104110
:parse-fn #(Integer/parseInt %)]
111+
["-i" "--server-id ID" "MySQL master server's ID"
112+
:parse-fn #(Integer/parseInt %)]
105113
["-s" "--stream STREAM" "Kinesis stream name"]])
106114

107115
(defn error-msg [errors]
@@ -120,5 +128,7 @@
120128
(nil? (:position options)) (exit 1 "A replication position must be specified")
121129
(nil? (:username options)) (exit 1 "A replication username must be specified")
122130
(nil? (:password options)) (exit 1 "A replication password must be specified")
123-
(nil? (:stream options)) (exit 1 "A kinesis stream name must be specified"))
131+
(nil? (:stream options)) (exit 1 "A kinesis stream name must be specified")
132+
(nil? (:server-id options)) (exit 1 "A server-id name must be specified"))
133+
(reset! log-file (:filename options))
124134
(.start (replicator options (MyListener. (:stream options))))))

0 commit comments

Comments
 (0)