binlog

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: GPL-2.0 Imports: 20 Imported by: 0

README

mysql_binlog_utils

Some utilities for mysql binlog

WARNING: these utilities are tested for mysql 5.5.33, but are not tested for mysql 5.6+

Dump binlog from pos

DumpBinlogFromPos(srcFilePath string, startPos int, targetFilePath string)

This function will dump binlog (at srcFilePath) from pos (startPos), and the output is at targetFilePath

If the startPos is 0 or 4, the whole binlog will be dump. Otherwise, the source binlog header (including FORMAT_DESCRIPTION_EVENT & ROTATE_EVENT & PREVIOUS_GTIDS_LOG_EVENT) will be dump as the target binlog header, and then the source data will be dump from startPos

This will make target binlog complete and available to replay.

Rotate relay log

RotateRelayLog(relayLogPath string, endPos int)

This function will add a rotate event to a relay log (at relayLogPath), after the position (endPos), and truncate the data after the position (endPos)

WARNING: after manually rotate relay log, DONOT forget to update relay-log.index

Fake master server

NewFakeMasterServer(port int, unusedServerId int, characterSet int, keepAliveWhenFinish bool, baseDir string)

WARNING: only support mysql 5.5.x

This fake master server is very helpful if you want to replay some binlog files to a mysql instance, and you're afraid of mysqlbinlog (http://bugs.mysql.com/bug.php?id=33048, for example)

Mysql replication is more reliable way to replay binlog, what we need is :

  1. server := NewFakeMasterServer(...)
  2. server.Start()
  3. In target mysql instance, change master to the fake server, and start slave
  4. the server will be closed when done or error
  5. you can abort the server by server.Abort()

####Some other features:

  1. start slave until is also supported
  2. large packet (>= 1<<24-1 bytes) is supported
  3. multiple binlog files are supported, fake server will act as a real replication master (rotate to the next when one is finished)

####Arguments:

Argument _
port the fake server port
unusedServerId the fake server id, should not be duplicate with any other mysql instance
characterSet the fake server character set id, should be the same with target mysql instance's. You can get the id by SELECT id, collation_name FROM information_schema.collations ORDER BY id
keepAliveWhenFinish when false, the fake server will quit when all binlogs are replayed. when true, the fake server will wait for more binlogs.
baseDir where the binlog files are located

Get unexecuted binlog files by gtid

GetUnexecutedBinlogFilesByGtid(binlogDir string, binlogBaseName string, executedGtidDesc string) (ret []string, err error)

This function will search binlog files in binlogDir, for the one contains a event whose gtid is not contained in executedGtidDesc

The scenario is in mysql replication, if mysql master is broken, mysql slave can call GetUnexecutedBinlogFilesByGtid to search in mysql master binlog dir, for the binlog files which need to be replay in slave

Get unexecuted binlog pos by gtid

GetUnexecutedBinlogPosByGtid(binlogFilePath string, executedGtidDesc string) (pos uint, err error)

This function will search first binlog event pos, which is not contained in executedGtidDesc, in binlog file (binlogFilePath)

Or return EOF error if has no unexecuted binlog event


####Pull requests and issues are warmly welcome

Documentation

Index

Constants

View Source
const (
	UNKNOWN_EVENT            = 0
	START_EVENT_V3           = 1
	QUERY_EVENT              = 2
	STOP_EVENT               = 3
	ROTATE_EVENT             = 4
	INTVAR_EVENT             = 5
	LOAD_EVENT               = 6
	SLAVE_EVENT              = 7
	CREATE_FILE_EVENT        = 8
	APPEND_BLOCK_EVENT       = 9
	EXEC_LOAD_EVENT          = 10
	DELETE_FILE_EVENT        = 11
	NEW_LOAD_EVENT           = 12
	RAND_EVENT               = 13
	USER_VAR_EVENT           = 14
	FORMAT_DESCRIPTION_EVENT = 15
	XID_EVENT                = 16
	BEGIN_LOAD_QUERY_EVENT   = 17
	EXECUTE_LOAD_QUERY_EVENT = 18
	TABLE_MAP_EVENT          = 19
	PRE_GA_WRITE_ROWS_EVENT  = 20
	PRE_GA_UPDATE_ROWS_EVENT = 21
	PRE_GA_DELETE_ROWS_EVENT = 22
	WRITE_ROWS_EVENT_V1      = 23
	UPDATE_ROWS_EVENT_V1     = 24
	DELETE_ROWS_EVENT_V1     = 25
	INCIDENT_EVENT           = 26
	HEARTBEAT_LOG_EVENT      = 27
	IGNORABLE_LOG_EVENT      = 28
	ROWS_QUERY_LOG_EVENT     = 29
	WRITE_ROWS_EVENT         = 30
	UPDATE_ROWS_EVENT        = 31
	DELETE_ROWS_EVENT        = 32
	GTID_LOG_EVENT           = 33
	ANONYMOUS_GTID_LOG_EVENT = 34
	PREVIOUS_GTIDS_LOG_EVENT = 35
)
View Source
const (
	LOG_EVENT_FIXED_HEADER_LEN = 19
	MAX_ALLOWED_PACKET         = 1024 * 1024 * 1024
)
View Source
const (
	CompressionNone uint64 = 255
	CompressionZSTD uint64 = 0
)
View Source
const (
	STATUS_PREPARE uint8 = iota
	STATUS_BEGIN
	STATUS_COMMIT
	STATUS_ROLLBACK
)

Variables

View Source
var (
	ErrInvalidBinlogHeader = errors.New("invalid binlog file header")
	ErrEventTooSmall       = errors.New("event size too small")
)

Functions

func BinlogIndexPath

func BinlogIndexPath(binlogPath string) (string, error)

func DumpBinlogFromPos

func DumpBinlogFromPos(srcFilePath string, startPos uint, targetFilePath string) error

func DumpUnexecutedBinlogByGtid

func DumpUnexecutedBinlogByGtid(srcFilePath string, executedGtidDesc string, targetFilePath string, includeEventBeforeFirst bool) error

func GenBinlogEventBytes

func GenBinlogEventBytes(fh EventFixedHeader, fd EventFixedData, vd EventVariableData) ([]byte, error)

func GetAllGtidOfBinlogDir

func GetAllGtidOfBinlogDir(binlogDir, binlogBaseName string) (gtidDesc string, err error)

func GetFirstPreviousGtidOfBinlogDir

func GetFirstPreviousGtidOfBinlogDir(binlogDir, binlogBaseName string) (gtidDesc string, err error)

func GetGtidOfBinlog

func GetGtidOfBinlog(binlogPath string) (gtidDesc string, err error)

func GetPreviousGtids

func GetPreviousGtids(binlogPath string) (gtidDesc string, err error)

func GetUnexecutedBinlogFilesByGtid

func GetUnexecutedBinlogFilesByGtid(binlogDir string, binlogBaseName string, executedGtidDesc string, includeEventBeforeFirst bool) (
	ret []string, err error)

func GetUnexecutedBinlogPosByGtid

func GetUnexecutedBinlogPosByGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, err error)

func GetUnexecutedBinlogPosByGtidAndAllGtid

func GetUnexecutedBinlogPosByGtidAndAllGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, allgtid string, err error)

func NextBinlogName

func NextBinlogName(binlogPath string) (string, error)

func NextBinlogPath

func NextBinlogPath(binlogPath string) (string, error)

func ParseBinlogFile

func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error

func ParseBinlogWithFilter

func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error

func ParseBinlogWithOptions added in v0.0.5

func ParseBinlogWithOptions(path string, opts ParseOptions, fx func(Transaction) bool) error

func SetLogger

func SetLogger(logger Logger)

Types

type BinlogEvent

type BinlogEvent struct {
	Type                     string
	DB                       string
	TB                       string
	Data                     string
	RowCnt                   uint32
	Rows                     [][]interface{}
	ColumnTypes              []int
	ColumnCollationIDs       []uint64
	CompressionType          string
	EventType                byte
	ServerID                 uint32
	Timestamp                uint32
	LogPos                   uint32
	EventSize                uint32
	LastCommitted            int64
	SequenceNumber           int64
	TransactionLength        uint64
	ImmediateCommitTimestamp uint64
	OriginalCommitTimestamp  uint64
}

func ParseBinlogEvent

func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent

type BinlogFilter

type BinlogFilter struct {
	IncludeGtid      string
	ExcludeGtid      string
	IncludeTables    []string
	ExcludeTables    []string
	StartPos         int
	EndPos           int
	StartDate        time.Time
	EndDate          time.Time
	BigThan          int
	SmallThan        int
	OnlyShowGtid     bool
	OnlyShowDML      bool
	PickTxAllIfMatch bool
	ExcludeBlank     bool
	IncludeBlank     bool
}

type EventFixedData

type EventFixedData struct {
	Bytes []byte
}

type EventFixedHeader

type EventFixedHeader struct {
	Bytes        []byte
	Timestamp    int
	EventType    int
	ServerId     int
	EventLength  uint
	NextPosition int
	Flags        int
}

type EventVariableData

type EventVariableData struct {
	Bytes []byte
}

type Logger

type Logger interface {
	Tracef(fmt string, args ...interface{})
}

type ParseOptions added in v0.0.5

type ParseOptions struct {
	Context    context.Context
	Filter     BinlogFilter
	StartPos   int64
	OnProgress func(ParseProgress) bool
}

type ParseProgress added in v0.0.5

type ParseProgress struct {
	Path     string
	Event    BinlogEvent
	EventPos int64
	NextPos  int64
	FileSize int64
}

type Transaction

type Transaction struct {
	GTID                     string    `json:"gtid"`
	Timestamp                int64     `json:"timestamp"`
	Time                     time.Time `json:"time"`
	StartPos                 int       `json:"startPos"`
	EndPos                   int       `json:"endPos"`
	Size                     int       `json:"size"`
	RowsCount                int       `json:"rowsCount"`
	Status                   uint8     `json:"status"`
	TxStartTime              int64     `json:"txStartTime"`
	TxEndTime                int64     `json:"txEndTime"`
	LastCommitted            int64     `json:"lastCommitted"`
	SequenceNumber           int64     `json:"sequenceNumber"`
	TransactionLength        uint64    `json:"transactionLength,omitempty"`
	ImmediateCommitTimestamp uint64    `json:"immediateCommitTimestamp,omitempty"`
	OriginalCommitTimestamp  uint64    `json:"originalCommitTimestamp,omitempty"`

	Txs []TxDetail `json:"txs"`
	// contains filtered or unexported fields
}

func (Transaction) GetSqlOrigin

func (t Transaction) GetSqlOrigin() []string

type TransactionOutcome added in v0.0.5

type TransactionOutcome string
const (
	TransactionOutcomeCommit     TransactionOutcome = "commit"
	TransactionOutcomeRollback   TransactionOutcome = "rollback"
	TransactionOutcomeAutocommit TransactionOutcome = "autocommit"
	TransactionOutcomeOpen       TransactionOutcome = "open"
	TransactionOutcomeOther      TransactionOutcome = "other"
)

type TransactionSummary added in v0.0.5

type TransactionSummary struct {
	GTID                     string
	SeenTime                 time.Time
	LastEventTime            time.Time
	BeginTime                time.Time
	EndTime                  time.Time
	Duration                 time.Duration
	HasBeginBoundary         bool
	HasEndBoundary           bool
	HasExplicitDuration      bool
	HasDuration              bool
	Outcome                  TransactionOutcome
	StartPos                 int
	EndPos                   int
	Size                     int
	RowsCount                int
	StatementsCount          int
	Tables                   []string
	SampleSQL                string
	LastCommitted            int64
	SequenceNumber           int64
	TransactionLength        uint64
	ImmediateCommitTimestamp uint64
	OriginalCommitTimestamp  uint64
}

func SummarizeTransaction added in v0.0.5

func SummarizeTransaction(tx Transaction) TransactionSummary

type TxDetail

type TxDetail struct {
	StartPos           int             `json:"startPos"`
	EndPos             int             `json:"endPos"`
	RowCount           int             `json:"rowCount"`
	Timestamp          int64           `json:"timestamp"`
	Time               time.Time       `json:"time"`
	Sql                string          `json:"sql"`
	Db                 string          `json:"db"`
	Table              string          `json:"table"`
	SqlType            string          `json:"sqlType"`
	CompressionType    string          `json:"compressionType"`
	Rows               [][]interface{} `json:"rows"`
	ColumnTypes        []int           `json:"columnTypes,omitempty"`
	ColumnCollationIDs []uint64        `json:"columnCollationIds,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL