Skip to content
This repository was archived by the owner on Mar 25, 2025. It is now read-only.

Commit afbb9e2

Browse files
author
Christopher Merrick
committed
refactoring, writing is still not totally correct
1 parent 683c325 commit afbb9e2

File tree

9 files changed

+187
-103
lines changed

9 files changed

+187
-103
lines changed

README.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1-
# deltalog
1+
# repwrite
22

3-
A Clojure library designed to ... well, that part is up to you.
3+
tap into your database's replication system and write all change data down to a file
44

5-
## Usage
5+
## usage
66

7-
FIXME
7+
_currently only support MySQL row-based replication_
88

9-
## License
9+
```bash
10+
lein run -m repwrite.core -f mysql-bin.000001 -n 8779 -p 5001
11+
```
1012

11-
Copyright © 2014 FIXME
13+
in your my.cnf:
1214

13-
Distributed under the Eclipse Public License either version 1.0 or (at
14-
your option) any later version.
15+
```
16+
server-id = 1234
17+
log-bin=mysql-bin
18+
binlog_format=row
19+
```

project.clj

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
[cascading/cascading-hadoop "2.2.0"]
1111
[cascading/cascading-local "2.2.0"]
1212
[cascading.gmarabout/cascading-json "0.0.3"]
13+
[cheshire "5.3.1"]
14+
[org.clojure/tools.cli "0.3.1"]
1315
[cascading.avro/avro-scheme "2.1.1"
1416
:exclusions [[org.slf4j/slf4j-log4j12]
1517
[org.apache.hadoop/hadoop-core]
1618
cascading/cascading-core]]
1719
[org.apache.avro/avro "1.7.4" :exclusions [org.slf4j/slf4j-api]]
18-
[com.damballa/abracad "0.4.9"]])
20+
[com.damballa/abracad "0.4.9"]
21+
[midje "1.6.3"]])

src/deltalog/core.clj

Lines changed: 0 additions & 83 deletions
This file was deleted.

src/deltalog/coerce.clj renamed to src/repwrite/coerce.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(ns deltalog.coerce
1+
(ns repwrite.coerce
22
(:import [com.google.code.or.common.glossary.column
33
Int24Column DecimalColumn DoubleColumn
44
EnumColumn FloatColumn LongColumn]))

src/repwrite/core.clj

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
(ns repwrite.core
2+
(:require [repwrite.coerce :refer [coerce]]
3+
[repwrite.schema :as schema]
4+
[cheshire.core :refer :all]
5+
[abracad.avro :as avro]
6+
[clojure.string :as string]
7+
[clojure.tools.cli :refer [parse-opts]])
8+
(:import [com.google.code.or OpenReplicator]
9+
[com.google.code.or.binlog BinlogEventListener]
10+
[com.google.code.or.binlog.impl.event WriteRowsEvent UpdateRowsEvent
11+
TableMapEvent QueryEvent DeleteRowsEvent AbstractRowEvent])
12+
(:gen-class))
13+
14+
#_(defn root-dir [table-id] (str "example/raw/" table-id))
15+
16+
(def root-dir (str "example/raw/"))
17+
18+
(defn mk-dir [dir]
19+
(.mkdir (java.io.File. dir))
20+
dir)
21+
22+
(defn write-avro [{:keys [table-id]} data]
23+
(let [filename (str (mk-dir (root-dir table-id)) "/raw.avro")
24+
schema (schema/tableid->schema table-id)]
25+
(with-open [adf (if (.exists (clojure.java.io/as-file filename))
26+
(avro/data-file-writer filename)
27+
(avro/data-file-writer schema filename))]
28+
(.append adf data))))
29+
30+
(defn write-json [{:keys [data] :as event}]
31+
(when (not (empty? data))
32+
(let [filename (str root-dir "raw.json")]
33+
(spit filename (str (generate-string data) "\n") :append true))))
34+
35+
(defmulti parse-event-data class)
36+
(defmethod parse-event-data WriteRowsEvent
37+
[e]
38+
(map #(map coerce (.getColumns %)) (.getRows e)))
39+
(defmethod parse-event-data UpdateRowsEvent
40+
[e]
41+
(map #(map coerce (.getColumns (.getAfter %))) (.getRows e)))
42+
(defmethod parse-event-data DeleteRowsEvent
43+
[e]
44+
(map #(map coerce (.getColumns %)) (.getRows e)))
45+
(defmethod parse-event-data :default
46+
[e]
47+
(println (str e "\n"))
48+
{})
49+
50+
(defmulti parse-meta-data class)
51+
(defmethod parse-meta-data AbstractRowEvent
52+
[e]
53+
{:table-id (.getTableId e)})
54+
(defmethod parse-meta-data :default
55+
[e]
56+
{})
57+
58+
(deftype MyListener []
59+
BinlogEventListener
60+
(onEvents
61+
[this e]
62+
(write-json (conj (parse-meta-data e)
63+
{:data (parse-event-data e)}))))
64+
65+
(defn replicator
66+
"get a replicator"
67+
[{:keys [host port filename position] :as opts} listener]
68+
(doto (OpenReplicator.)
69+
(.setUser "replication-user")
70+
(.setPassword "password")
71+
(.setHost host)
72+
(.setPort port)
73+
(.setServerId 1234)
74+
(.setBinlogFileName filename)
75+
(.setBinlogPosition position)
76+
(.setBinlogEventListener listener)))
77+
78+
(def cli-options
79+
[["-h" "--host HOST" "Replication master hostname or IP"
80+
:default "127.0.0.1"]
81+
["-p" "--port PORT" "Replication master port number"
82+
:default 3306
83+
:parse-fn #(Integer/parseInt %)
84+
:validate [#(< 0 % 0x10000) "Must be a number between 0 and 65536"]]
85+
["-f" "--filename FILENAME" "Binlog filename"]
86+
["-n" "--position POSITION" "Binlog position"
87+
:parse-fn #(Integer/parseInt %)]])
88+
89+
(defn error-msg [errors]
90+
(str "The following errors occurred while parsing your command:\n\n"
91+
(string/join \newline errors)))
92+
93+
(defn exit [status msg]
94+
(println msg)
95+
(System/exit status))
96+
97+
(defn -main [& args]
98+
(let [{:keys [options arguments errors summary]} (parse-opts args cli-options)]
99+
(cond
100+
errors (exit 1 (error-msg errors))
101+
(nil? (:filename options)) (exit 1 "A replication filename must be specified")
102+
(nil? (:position options)) (exit 1 "A replication position must be specified"))
103+
(mk-dir root-dir)
104+
(.start (replicator options (MyListener.)))))

src/deltalog/mr.clj renamed to src/repwrite/mr.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
(ns deltalog.mr
2-
(:require [deltalog.schema :as schema])
1+
(ns repwrite.mr
2+
(:require [repwrite.schema :as schema])
33
(:import [org.apache.hadoop.fs Path FileSystem]
44
[org.apache.hadoop.conf Configuration]
55
[cascading.tap SinkMode]

src/deltalog/schema.clj renamed to src/repwrite/schema.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(ns deltalog.schema
1+
(ns repwrite.schema
22
(:require [abracad.avro :as avro])
33
(:import [org.apache.avro Schema$Parser]))
44

test/deltalog/core_test.clj

Lines changed: 0 additions & 7 deletions
This file was deleted.

test/repwrite/core_test.clj

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
(ns deltalog.core-test
2+
(:use midje.sweet)
3+
(:require [repwrite.core :refer :all])
4+
(:import [com.google.code.or.binlog.impl.event WriteRowsEvent UpdateRowsEvent
5+
TableMapEvent QueryEvent DeleteRowsEvent BinlogEventV4HeaderImpl]
6+
[com.google.code.or.common.glossary Row Pair]
7+
[com.google.code.or.common.glossary.column Int24Column]))
8+
9+
(def header (doto (BinlogEventV4HeaderImpl.)
10+
(.setTimestamp (long 10000))
11+
(.setTimestampOfReceipt (long 10000))
12+
(.setServerId (long 1))
13+
))
14+
15+
(defn int-col [i] (Int24Column/valueOf i))
16+
17+
(defn to-row [cols] (Row. cols))
18+
19+
(defn to-pair [before after] (Pair. before after))
20+
21+
(facts "row-based replication events"
22+
(fact "it handles empty WriteRowsEvents"
23+
(parse-event-data (WriteRowsEvent. header)) => [])
24+
25+
(fact "it handles single-row WriteRowsEvents"
26+
(let [e (doto (WriteRowsEvent. header)
27+
(.setRows [(to-row [(int-col 1)])]))]
28+
(parse-event-data e) => [[1]]))
29+
30+
(fact "it handles multi-row WriteRowsEvents"
31+
(let [e (doto (WriteRowsEvent. header)
32+
(.setRows [(to-row [(int-col 1)])
33+
(to-row [(int-col 2)])]))]
34+
(parse-event-data e) => [[1]
35+
[2]]))
36+
37+
(fact "it handles single-row UpdateRowsEvents"
38+
(let [e (doto (UpdateRowsEvent. header)
39+
(.setRows [(to-pair (to-row [(int-col 1)])
40+
(to-row [(int-col 2)]))]))]
41+
(parse-event-data e) => [[2]]))
42+
43+
(fact "it handles multi-row UpdateRowsEvents"
44+
(let [e (doto (UpdateRowsEvent. header)
45+
(.setRows [(to-pair (to-row [(int-col 1)])
46+
(to-row [(int-col 2)]))
47+
(to-pair (to-row [(int-col 3)])
48+
(to-row [(int-col 4)]))]))]
49+
(parse-event-data e) => [[2]
50+
[4]]))
51+
52+
(fact "it handles single-row DeleteRowsEvents"
53+
(let [e (doto (DeleteRowsEvent. header)
54+
(.setRows [(to-row [(int-col 1)])]))]
55+
(parse-event-data e) => [[1]]))
56+
57+
(fact "it handles multi-row DeleteRowsEvents"
58+
(let [e (doto (DeleteRowsEvent. header)
59+
(.setRows [(to-row [(int-col 1)])
60+
(to-row [(int-col 2)])]))]
61+
(parse-event-data e) => [[1]
62+
[2]])))

0 commit comments

Comments
 (0)