python-mysql-replication
版本:0.31(支持 python2.7)
import json
import sys
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": ""
}
def main():
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
server_id=3,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event = dict(event.items() + row["values"].items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event = dict(event.items() + row["after_values"].items()) # update 时取的是 after_values
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event = dict(event.items() + row["values"].items())
print json.dumps(event)
sys.stdout.flush()
stream.close()
if __name__ == "__main__":
main()
Binlog 位置
for binlogevent in stream:
print('binlog: {log_file}, positon: {log_pos}'.format(log_file=stream.log_file, log_pos=stream.log_pos))
---
binlog: mysql-bin.024987, positon: 1931
binlog: mysql-bin.024987, positon: 3059
binlog: mysql-bin.024987, positon: 4363
若不加断点续传,则每次都会从最新的 binlog 最开始进行读取
传送门
Last updated