🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • Binlog 位置
  • 传送门
  1. 七 潘多拉魔盒
  2. 实用工具

python-mysql-replication

版本:0.31(支持 python2.7)

import json
import sys

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": ""
}


def main():
    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        server_id=3,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        for row in binlogevent.rows:
            event = {"schema": binlogevent.schema, "table": binlogevent.table}

            if isinstance(binlogevent, DeleteRowsEvent):
                event["action"] = "delete"
                event = dict(event.items() + row["values"].items())
            elif isinstance(binlogevent, UpdateRowsEvent):
                event["action"] = "update"
                event = dict(event.items() + row["after_values"].items()) # update 时取的是 after_values
            elif isinstance(binlogevent, WriteRowsEvent):
                event["action"] = "insert"
                event = dict(event.items() + row["values"].items())
            print json.dumps(event)
            sys.stdout.flush()


    stream.close()


if __name__ == "__main__":
    main()

Binlog 位置

for binlogevent in stream:
    print('binlog: {log_file}, positon: {log_pos}'.format(log_file=stream.log_file, log_pos=stream.log_pos))
        
---
binlog: mysql-bin.024987, positon: 1931
binlog: mysql-bin.024987, positon: 3059
binlog: mysql-bin.024987, positon: 4363

若不加断点续传,则每次都会从最新的 binlog 最开始进行读取

传送门

Previousbusiness_rulesNextdict_util

Last updated 11 months ago

(断点续传例子)

https://github.com/hcymysql/mysql_repl/tree/main