Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
*~
.idea/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ binlog2sql
正常维护。应用于大众点评线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行

* 已测试环境
* Python 2.6, 2.7
* Python 2.6, 2.7, 3.4
* MySQL 5.6


Expand Down
182 changes: 92 additions & 90 deletions binlog2sql/binlog2sql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os, sys, datetime
import sys
import datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
Expand All @@ -10,128 +11,129 @@
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines
from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \
temp_open, print_rollback_sql


class Binlog2sql(object):

def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None,
stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False):
'''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
'''
if not startFile:
raise ValueError('lack of parameter,startFile.')

self.connectionSettings = connectionSettings
self.startFile = startFile
self.startPos = startPos if startPos else 4 # use binlog v4
self.endFile = endFile if endFile else startFile
self.endPos = endPos
self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,
start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,
flashback=False, stop_never=False):
"""
conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'}
"""

if not start_file:
raise ValueError('Lack of parameter: start_file')

self.conn_setting = connection_settings
self.start_file = start_file
self.start_pos = start_pos if start_pos else 4 # use binlog v4
self.end_file = end_file if end_file else start_file
self.end_pos = end_pos
if start_time:
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
else:
self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
if stop_time:
self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
else:
self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")

self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None
self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever)
self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never)

self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings)
try:
cur = self.connection.cursor()
cur.execute("SHOW MASTER STATUS")
self.eofFile, self.eofPos = cur.fetchone()[:2]
cur.execute("SHOW MASTER LOGS")
binIndex = [row[0] for row in cur.fetchall()]
if self.startFile not in binIndex:
raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile)
self.connection = pymysql.connect(**self.conn_setting)
with self.connection as cursor:
cursor.execute("SHOW MASTER STATUS")
self.eof_file, self.eof_pos = cursor.fetchone()[:2]
cursor.execute("SHOW MASTER LOGS")
bin_index = [row[0] for row in cursor.fetchall()]
if self.start_file not in bin_index:
raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file)
binlog2i = lambda x: x.split('.')[1]
for bin in binIndex:
if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile):
self.binlogList.append(bin)
for binary in bin_index:
if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file):
self.binlogList.append(binary)

cur.execute("SELECT @@server_id")
self.serverId = cur.fetchone()[0]
if not self.serverId:
raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port']))
finally:
cur.close()
cursor.execute("SELECT @@server_id")
self.server_id = cursor.fetchone()[0]
if not self.server_id:
raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))

def process_binlog(self):
stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId,
log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas,
stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,
log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True)

cur = self.connection.cursor()
tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagLastEvent = False
eStartPos, lastPos = stream.log_pos, stream.log_pos
try:
for binlogevent in stream:
if not self.stopnever:
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagLastEvent = True
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
flag_last_event = False
e_start_pos, last_pos = stream.log_pos, stream.log_pos
# to simplify code, we do not use flock for tmp_file.
tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor:
for binlog_event in stream:
if not self.stop_never:
try:
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime.datetime(1980, 1, 1, 0, 0)
if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos == self.eof_pos):
flag_last_event = True
elif event_time < self.start_time:
if not (isinstance(binlog_event, RotateEvent)
or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
continue
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime):
elif (stream.log_file not in self.binlogList) or \
(self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \
(event_time >= self.stop_time):
break
# else:
# raise ValueError('unknown binlog file or position')

if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
eStartPos = lastPos
if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
e_start_pos = last_pos

if isinstance(binlogevent, QueryEvent):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
if isinstance(binlog_event, QueryEvent):
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event,
flashback=self.flashback, no_pk=self.no_pk)
if sql:
print sql
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
print(sql)
elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\
isinstance(binlog_event, DeleteRowsEvent):
for row in binlog_event.rows:
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk,
row=row, flashback=self.flashback, e_start_pos=e_start_pos)
if self.flashback:
ftmp.write(sql + '\n')
f_tmp.write(sql + '\n')
else:
print sql
print(sql)

if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
if flag_last_event:
break
ftmp.close()

stream.close()
f_tmp.close()
if self.flashback:
self.print_rollback_sql(tmpFile)
finally:
os.remove(tmpFile)
cur.close()
stream.close()
print_rollback_sql(filename=tmp_file)
return True

def print_rollback_sql(self, fin):
'''print rollback sql from tmpfile'''
with open(fin) as ftmp:
sleepInterval = 1000
i = 0
for line in reversed_lines(ftmp):
print line.rstrip()
if i >= sleepInterval:
print 'SELECT SLEEP(1);'
i = 0
else:
i += 1

def __del__(self):
pass


if __name__ == '__main__':

args = command_line_args(sys.argv[1:])
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases,
only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever)
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,
end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never)
binlog2sql.process_binlog()
Loading