using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; using MySqlConnector; using DatabaseSnapshotsService.Models; namespace DatabaseSnapshotsService.Services { public class BinlogReader { private readonly BinlogReaderConfig _config; private readonly EventStore _eventStore; private MySqlConnection _connection; private bool _isConnected; private bool _isReading; private CancellationTokenSource _cancellationTokenSource; public event EventHandler EventReceived; public event EventHandler LogMessage; public BinlogReader(BinlogReaderConfig config, EventStore eventStore) { _config = config; _eventStore = eventStore; } public async Task ConnectAsync() { try { LogMessage?.Invoke(this, $"Attempting to connect to {_config.Host}:{_config.Port}"); var connectionString = $"Server={_config.Host};Port={_config.Port};User ID={_config.Username};Password={_config.Password};"; _connection = new MySqlConnection(connectionString); await _connection.OpenAsync(); _isConnected = true; LogMessage?.Invoke(this, $"Connected to MySQL at {_config.Host}:{_config.Port}"); return true; } catch (Exception ex) { LogMessage?.Invoke(this, $"Connection failed: {ex.Message}"); return false; } } public async Task StartReadingAsync(string binlogFile = null, long position = 4) { if (!_isConnected) { throw new InvalidOperationException("Not connected to MySQL"); } _cancellationTokenSource = new CancellationTokenSource(); _isReading = true; try { LogMessage?.Invoke(this, $"Starting binlog read from position {position}"); // Get current binlog status await ReadBinlogStatusAsync(); // Read binlog events await ReadBinlogEventsAsync(binlogFile, position); } catch (OperationCanceledException) { LogMessage?.Invoke(this, "Binlog reading stopped"); } catch (Exception ex) { LogMessage?.Invoke(this, $"Error reading binlog: {ex.Message}"); } finally { _isReading = false; } } public void StopReading() { _cancellationTokenSource?.Cancel(); _isReading = false; } public void Disconnect() { StopReading(); _connection?.Close(); _connection?.Dispose(); _isConnected = false; } private async Task ReadBinlogStatusAsync() { using var command = _connection.CreateCommand(); command.CommandText = "SHOW MASTER STATUS"; using var reader = await command.ExecuteReaderAsync(); if (await reader.ReadAsync()) { var file = reader.GetString("File"); var position = reader.GetInt64("Position"); LogMessage?.Invoke(this, $"Current binlog: {file} at position {position}"); // Create a status event and store it var statusEvent = new DatabaseEvent { Type = "status", Table = "system", Operation = "binlog_status", Data = $"SHOW MASTER STATUS - File: {file}, Position: {position}", BinlogPosition = (long)position, ServerId = _config.ServerId }; await _eventStore.StoreEventAsync(statusEvent); // Create a binlog event for the status var evt = new BinlogEvent { Timestamp = DateTimeOffset.UtcNow, EventType = BinlogEventType.QUERY_EVENT, LogPosition = (uint)position, EventSize = 0, Flags = 0, EventData = Encoding.UTF8.GetBytes($"SHOW MASTER STATUS - File: {file}, Position: {position}"), RawPacket = new byte[0] }; EventReceived?.Invoke(this, evt); } } private async Task ReadBinlogEventsAsync(string binlogFile, long position) { try { // Get the binlog file to read from var targetFile = binlogFile; if (string.IsNullOrEmpty(targetFile)) { targetFile = await GetCurrentBinlogFileAsync(); } LogMessage?.Invoke(this, $"Reading binlog events from {targetFile} starting at position {position}"); using var command = _connection.CreateCommand(); command.CommandText = $"SHOW BINLOG EVENTS IN '{targetFile}' FROM {position}"; LogMessage?.Invoke(this, $"Executing query: {command.CommandText}"); using var reader = await command.ExecuteReaderAsync(); var eventCount = 0; LogMessage?.Invoke(this, "Starting to read binlog events..."); while (await reader.ReadAsync()) { if (_cancellationTokenSource.Token.IsCancellationRequested) break; var logName = reader.GetString("Log_name"); var logPos = reader.GetInt64("Pos"); var eventType = reader.GetString("Event_type"); var serverId = reader.GetInt32("Server_id"); var endLogPos = reader.GetInt64("End_log_pos"); var info = reader.GetString("Info"); // Only log every 100 events to reduce console output eventCount++; if (eventCount % 100 == 0) { LogMessage?.Invoke(this, $"Processed {eventCount} events..."); } // Parse event type var binlogEventType = ParseEventType(eventType); // Create and store database event var databaseEvent = new DatabaseEvent { Type = "binlog", Table = ExtractTableName(info), Operation = ExtractOperation(eventType, info), Data = info, BinlogPosition = logPos, ServerId = serverId }; await _eventStore.StoreEventAsync(databaseEvent); // Create binlog event for real-time processing var evt = new BinlogEvent { Timestamp = DateTimeOffset.UtcNow, EventType = binlogEventType, LogPosition = (uint)logPos, EventSize = (uint)(endLogPos - logPos), Flags = 0, EventData = Encoding.UTF8.GetBytes(info), RawPacket = new byte[0] }; EventReceived?.Invoke(this, evt); } LogMessage?.Invoke(this, $"Completed reading binlog events. Total events processed: {eventCount}"); } catch (Exception ex) { LogMessage?.Invoke(this, $"Error reading binlog events: {ex.Message}"); throw; } } private async Task GetCurrentBinlogFileAsync() { using var command = _connection.CreateCommand(); command.CommandText = "SHOW MASTER STATUS"; using var reader = await command.ExecuteReaderAsync(); if (await reader.ReadAsync()) { return reader.GetString("File"); } throw new Exception("Could not determine current binlog file"); } private string ExtractTableName(string info) { // Try to extract table name from various event types if (info.Contains("table_id:")) { var match = System.Text.RegularExpressions.Regex.Match(info, @"table_id: \d+ \(([^)]+)\)"); if (match.Success) { return match.Groups[1].Value; } } // For query events, try to extract table name from SQL if (info.Contains("INSERT INTO") || info.Contains("UPDATE") || info.Contains("DELETE FROM")) { var match = System.Text.RegularExpressions.Regex.Match(info, @"(?:INSERT INTO|UPDATE|DELETE FROM)\s+(\w+)"); if (match.Success) { return match.Groups[1].Value; } } return "unknown"; } private string ExtractOperation(string eventType, string info) { return eventType.ToUpper() switch { "WRITE_ROWS_V1" => "insert", "UPDATE_ROWS_V1" => "update", "DELETE_ROWS_V1" => "delete", "QUERY" => "query", "ANNOTATE_ROWS" => "query", _ => eventType.ToLower() }; } private BinlogEventType ParseEventType(string eventType) { return eventType.ToUpper() switch { "QUERY" => BinlogEventType.QUERY_EVENT, "XID" => BinlogEventType.XID_EVENT, "GTID" => BinlogEventType.GTID_EVENT, "TABLE_MAP" => BinlogEventType.TABLE_MAP_EVENT, "WRITE_ROWS_V1" => BinlogEventType.WRITE_ROWS_EVENT_V1, "UPDATE_ROWS_V1" => BinlogEventType.UPDATE_ROWS_EVENT_V1, "DELETE_ROWS_V1" => BinlogEventType.DELETE_ROWS_EVENT_V1, "ANNOTATE_ROWS" => BinlogEventType.QUERY_EVENT, // Treat as query event "ROTATE" => BinlogEventType.ROTATE_EVENT, "FORMAT_DESCRIPTION" => BinlogEventType.FORMAT_DESCRIPTION_EVENT, _ => BinlogEventType.UNKNOWN_EVENT }; } } public class BinlogEvent { public DateTimeOffset Timestamp { get; set; } public BinlogEventType EventType { get; set; } public uint LogPosition { get; set; } public uint EventSize { get; set; } public ushort Flags { get; set; } public byte[] EventData { get; set; } public byte[] RawPacket { get; set; } } public enum BinlogEventType : byte { UNKNOWN_EVENT = 0x00, START_EVENT_V3 = 0x01, QUERY_EVENT = 0x02, STOP_EVENT = 0x03, ROTATE_EVENT = 0x04, INTVAR_EVENT = 0x05, LOAD_EVENT = 0x06, SLAVE_EVENT = 0x07, CREATE_FILE_EVENT = 0x08, APPEND_BLOCK_EVENT = 0x09, EXEC_LOAD_EVENT = 0x0A, DELETE_FILE_EVENT = 0x0B, NEW_LOAD_EVENT = 0x0C, RAND_EVENT = 0x0D, USER_VAR_EVENT = 0x0E, FORMAT_DESCRIPTION_EVENT = 0x0F, XID_EVENT = 0x10, BEGIN_LOAD_QUERY_EVENT = 0x11, EXECUTE_LOAD_QUERY_EVENT = 0x12, TABLE_MAP_EVENT = 0x13, WRITE_ROWS_EVENT_V0 = 0x14, UPDATE_ROWS_EVENT_V0 = 0x15, DELETE_ROWS_EVENT_V0 = 0x16, WRITE_ROWS_EVENT_V1 = 0x17, UPDATE_ROWS_EVENT_V1 = 0x18, DELETE_ROWS_EVENT_V1 = 0x19, INCIDENT_EVENT = 0x1A, HEARTBEAT_EVENT = 0x1B, IGNORABLE_EVENT = 0x1C, ROWS_QUERY_EVENT = 0x1D, WRITE_ROWS_EVENT_V2 = 0x1E, UPDATE_ROWS_EVENT_V2 = 0x1F, DELETE_ROWS_EVENT_V2 = 0x20, GTID_EVENT = 0x21, ANONYMOUS_GTID_EVENT = 0x22, PREVIOUS_GTIDS_EVENT = 0x23 } }