python-mysql-replication

版本:0.31(支持 python2.7)

import json
import sys

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": ""
}


def main():
    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        server_id=3,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        for row in binlogevent.rows:
            event = {"schema": binlogevent.schema, "table": binlogevent.table}

            if isinstance(binlogevent, DeleteRowsEvent):
                event["action"] = "delete"
                event = dict(event.items() + row["values"].items())
            elif isinstance(binlogevent, UpdateRowsEvent):
                event["action"] = "update"
                event = dict(event.items() + row["after_values"].items()) # update 时取的是 after_values
            elif isinstance(binlogevent, WriteRowsEvent):
                event["action"] = "insert"
                event = dict(event.items() + row["values"].items())
            print json.dumps(event)
            sys.stdout.flush()


    stream.close()


if __name__ == "__main__":
    main()

Binlog 位置

for binlogevent in stream:
    print('binlog: {log_file}, positon: {log_pos}'.format(log_file=stream.log_file, log_pos=stream.log_pos))
        
---
binlog: mysql-bin.024987, positon: 1931
binlog: mysql-bin.024987, positon: 3059
binlog: mysql-bin.024987, positon: 4363

若不加断点续传,则每次都会从最新的 binlog 最开始进行读取

传送门

https://github.com/hcymysql/mysql_repl/tree/main (断点续传例子)

Last updated