22package main
33
44import (
5+ //"fmt"
56 "flag"
67 "time"
78 "log"
2223 bsonFile = flag .String ("o" , "flashback.bson" , "bson output file, will be overwritten" )
2324 packetBufSize = flag .Int ("size" , 1000 , "size of packet buffer used for ordering within streams" )
2425 continueOnError = flag .Bool ("continue_on_error" , false , "Continue parsing lines if an error is encountered" )
26+ debug = flag .Bool ("debug" , false , "Print debug-level output" )
2527)
2628
2729type Operation struct {
@@ -42,51 +44,107 @@ func parseQuery(opQuery []byte) (bson.D, error) {
4244 return query , err
4345}
4446
45- func (fbOp * Operation ) handleCommand (query bson.D , f * os.File ) error {
47+ func (fbOp * Operation ) handleCommand (opCommand * mongoproto.OpQuery , f * os.File ) error {
48+ var err error
4649 fbOp .Type = flashback .Command
47- fbOp .CommandDoc = query
50+ fbOp .CommandDoc , err = parseQuery (opCommand .Query )
51+ if err != nil {
52+ return err
53+ }
4854 return fbOp .writeOp (f )
4955}
5056
51- func (fbOp * Operation ) handleQuery (query bson.D , f * os.File ) error {
57+ func (fbOp * Operation ) handleQuery (opQuery * mongoproto.OpQuery , f * os.File ) error {
58+ var err error
59+ fbOp .Ns = opQuery .FullCollectionName
5260 fbOp .Type = flashback .Query
53- fbOp .QueryDoc = query
61+ fbOp .QueryDoc , err = parseQuery (opQuery .Query )
62+ if err != nil {
63+ return err
64+ }
65+ if strings .HasSuffix (opQuery .FullCollectionName , ".$cmd" ) {
66+ // sometimes mongoproto returns inserts as 'commands'
67+ collection , exists := flashback .GetElem (fbOp .QueryDoc , "insert" )
68+ if exists == true {
69+ opQuery .FullCollectionName = strings .Replace (opQuery .FullCollectionName , "$cmd" , collection .(string ), 1 )
70+ return fbOp .handleInsertFromQuery (opQuery , f )
71+ } else {
72+ return fbOp .handleCommand (opQuery , f )
73+ }
74+ } else {
75+ //_, exists := flashback.GetElem(fbOp.QueryDoc, "insert")
76+ //if exists == true {
77+ // return fbOp.handleInsertFromQuery(opQuery, f)
78+ //} else {
79+ return fbOp .writeOp (f )
80+ //}
81+ }
82+ }
83+
84+ func (fbOp * Operation ) handleInsertDocument (document bson.D , opInsert * mongoproto.OpInsert , f * os.File ) error {
85+ fbOp .Type = flashback .Insert
86+ fbOp .Ns = opInsert .FullCollectionName
87+ fbOp .InsertDoc = document
5488 return fbOp .writeOp (f )
5589}
5690
57- func (fbOp * Operation ) handleInsert (query bson. D , f * os.File ) error {
91+ func (fbOp * Operation ) handleInsert (opInsert * mongoproto. OpInsert , f * os.File ) error {
5892 var err error
59- inserts := []* Operation {}
60- fbOp .Type = flashback .Insert
93+ if opInsert .Documents != nil {
94+ for _ , document := range opInsert .Documents {
95+ query , err := parseQuery (document )
96+ if err != nil {
97+ return err
98+ }
99+ err = fbOp .handleInsertDocument (query , opInsert , f )
100+ if err != nil {
101+ return err
102+ }
103+ }
104+ }
105+ return err
106+ }
107+
108+ func (fbOp * Operation ) handleInsertFromQuery (opQuery * mongoproto.OpQuery , f * os.File ) error {
109+ var inserts []bson.D
110+ query , err := parseQuery (opQuery .Query )
111+ if err != nil {
112+ return err
113+ }
61114 documents , exists := flashback .GetElem (query , "documents" )
62115 if exists == true {
63116 if (reflect .TypeOf (documents ).Kind () == reflect .Slice ) {
64117 for _ , document := range documents .([]interface {}) {
65- multiInsertOp := & Operation {
66- Ns : fbOp .Ns ,
67- Timestamp : fbOp .Timestamp ,
68- InsertDoc : document .(bson. D ) ,
69- Type : fbOp . Type ,
70- }
71- inserts = append (inserts , multiInsertOp )
118+ // multiInsertOp := &Operation{
119+ // Ns: fbOp.Ns,
120+ // Timestamp: fbOp.Timestamp,
121+ // Type: fbOp.Type ,
122+ // InsertDoc: document.(bson.D) ,
123+ // }
124+ inserts = append (inserts , document .(bson. D ) )
72125 }
73126 } else {
74- fbOp .InsertDoc = documents .(bson.D )
75- inserts = append (inserts , fbOp )
127+ inserts = append (inserts , documents .(bson.D ))
76128 }
77- } else {
78- fbOp .InsertDoc = query
79- inserts = append (inserts , fbOp )
80- }
129+ }
81130 for _ , insert := range inserts {
82- err = insert .writeOp (f )
83- if err != nil {
84- return err
85- }
131+ fbOp .InsertDoc = insert
132+ err = fbOp .writeOp (f )
86133 }
87134 return err
88135}
89136
137+ func (fbOp * Operation ) handleDelete (opDelete * mongoproto.OpDelete , f * os.File ) error {
138+ var err error
139+ fbOp .Type = flashback .Remove
140+ fbOp .Ns = opDelete .FullCollectionName
141+ fbOp .QueryDoc , err = parseQuery (opDelete .Selector )
142+ if err != nil {
143+ return err
144+ }
145+ return fbOp .writeOp (f )
146+ }
147+
90148func (op * Operation ) writeOp (f * os.File ) error {
91149 opBson , err := bson .Marshal (op )
92150 f .Write (opBson )
@@ -116,59 +174,27 @@ func main() {
116174 }
117175 defer f .Close ()
118176 for op := range m .Ops {
119- if _ , ok := op .Op .(* mongoproto.OpUnknown ); ok {
120- logger .Println ("Found unknown operation" )
121- } else if opInsert , ok := op .Op .(* mongoproto.OpInsert ); ok {
122- for _ , document := range opInsert .Documents {
123- query , err := parseQuery (document )
124- if err != nil {
125- logger .Println (err )
126- if ! * continueOnError {
127- os .Exit (1 )
128- }
129- }
130- fbOp := & Operation {
131- Ns : opInsert .FullCollectionName ,
132- Timestamp : op .Seen ,
133- }
134- fbOp .handleInsert (query , f )
177+ // todo: fix mongoproto.OpUpdate and mongoproto.OpDelete so they can be added
178+ var err error
179+ fbOp := & Operation {
180+ Timestamp : op .Seen ,
181+ }
182+ if * debug == true {
183+ if _ , ok := op .Op .(* mongoproto.OpUnknown ); ok {
184+ logger .Println ("Found unknown operation" )
135185 }
186+ }
187+ if opDelete , ok := op .Op .(* mongoproto.OpDelete ); ok {
188+ err = fbOp .handleDelete (opDelete , f )
189+ } else if opInsert , ok := op .Op .(* mongoproto.OpInsert ); ok {
190+ err = fbOp .handleInsert (opInsert , f )
136191 } else if opQuery , ok := op .Op .(* mongoproto.OpQuery ); ok {
137- fbOp := & Operation {
138- Ns : opQuery .FullCollectionName ,
139- NToSkip : opQuery .NumberToSkip ,
140- NToReturn : opQuery .NumberToReturn ,
141- Timestamp : op .Seen ,
142- }
143- query , err := parseQuery (opQuery .Query )
144- if err != nil {
145- logger .Println (err )
146- if ! * continueOnError {
147- os .Exit (1 )
148- }
149- }
150- if strings .HasSuffix (opQuery .FullCollectionName , ".$cmd" ) {
151- // sometimes mongoproto returns inserts as 'commands'
152- collection , exists := flashback .GetElem (query , "insert" )
153- if exists == true {
154- fbOp .Ns = strings .Replace (fbOp .Ns , "$cmd" , collection .(string ), 1 )
155- err = fbOp .handleInsert (query , f )
156- } else {
157- err = fbOp .handleCommand (query , f )
158- }
159- } else {
160- _ , exists := flashback .GetElem (query , "insert" )
161- if exists == true {
162- err = fbOp .handleInsert (query , f )
163- } else {
164- err = fbOp .handleQuery (query , f )
165- }
166- }
167- if err != nil {
168- logger .Println (err )
169- if ! * continueOnError {
170- os .Exit (1 )
171- }
192+ err = fbOp .handleQuery (opQuery , f )
193+ }
194+ if err != nil {
195+ logger .Println (err )
196+ if ! * continueOnError {
197+ os .Exit (1 )
172198 }
173199 }
174200 }
0 commit comments