add project
This commit is contained in:
345
Services/BinlogReader.cs
Normal file
345
Services/BinlogReader.cs
Normal file
@ -0,0 +1,345 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MySqlConnector;
|
||||
using DatabaseSnapshotsService.Models;
|
||||
|
||||
namespace DatabaseSnapshotsService.Services
|
||||
{
|
||||
public class BinlogReader
|
||||
{
|
||||
private readonly BinlogReaderConfig _config;
|
||||
private readonly EventStore _eventStore;
|
||||
private MySqlConnection _connection;
|
||||
private bool _isConnected;
|
||||
private bool _isReading;
|
||||
private CancellationTokenSource _cancellationTokenSource;
|
||||
|
||||
public event EventHandler<BinlogEvent> EventReceived;
|
||||
public event EventHandler<string> LogMessage;
|
||||
|
||||
public BinlogReader(BinlogReaderConfig config, EventStore eventStore)
|
||||
{
|
||||
_config = config;
|
||||
_eventStore = eventStore;
|
||||
}
|
||||
|
||||
public async Task<bool> ConnectAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Attempting to connect to {_config.Host}:{_config.Port}");
|
||||
|
||||
var connectionString = $"Server={_config.Host};Port={_config.Port};User ID={_config.Username};Password={_config.Password};";
|
||||
_connection = new MySqlConnection(connectionString);
|
||||
|
||||
await _connection.OpenAsync();
|
||||
_isConnected = true;
|
||||
|
||||
LogMessage?.Invoke(this, $"Connected to MySQL at {_config.Host}:{_config.Port}");
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Connection failed: {ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task StartReadingAsync(string binlogFile = null, long position = 4)
|
||||
{
|
||||
if (!_isConnected)
|
||||
{
|
||||
throw new InvalidOperationException("Not connected to MySQL");
|
||||
}
|
||||
|
||||
_cancellationTokenSource = new CancellationTokenSource();
|
||||
_isReading = true;
|
||||
|
||||
try
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Starting binlog read from position {position}");
|
||||
|
||||
// Get current binlog status
|
||||
await ReadBinlogStatusAsync();
|
||||
|
||||
// Read binlog events
|
||||
await ReadBinlogEventsAsync(binlogFile, position);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
LogMessage?.Invoke(this, "Binlog reading stopped");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Error reading binlog: {ex.Message}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_isReading = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void StopReading()
|
||||
{
|
||||
_cancellationTokenSource?.Cancel();
|
||||
_isReading = false;
|
||||
}
|
||||
|
||||
public void Disconnect()
|
||||
{
|
||||
StopReading();
|
||||
_connection?.Close();
|
||||
_connection?.Dispose();
|
||||
_isConnected = false;
|
||||
}
|
||||
|
||||
private async Task ReadBinlogStatusAsync()
|
||||
{
|
||||
using var command = _connection.CreateCommand();
|
||||
command.CommandText = "SHOW MASTER STATUS";
|
||||
|
||||
using var reader = await command.ExecuteReaderAsync();
|
||||
if (await reader.ReadAsync())
|
||||
{
|
||||
var file = reader.GetString("File");
|
||||
var position = reader.GetInt64("Position");
|
||||
|
||||
LogMessage?.Invoke(this, $"Current binlog: {file} at position {position}");
|
||||
|
||||
// Create a status event and store it
|
||||
var statusEvent = new DatabaseEvent
|
||||
{
|
||||
Type = "status",
|
||||
Table = "system",
|
||||
Operation = "binlog_status",
|
||||
Data = $"SHOW MASTER STATUS - File: {file}, Position: {position}",
|
||||
BinlogPosition = (long)position,
|
||||
ServerId = _config.ServerId
|
||||
};
|
||||
|
||||
await _eventStore.StoreEventAsync(statusEvent);
|
||||
|
||||
// Create a binlog event for the status
|
||||
var evt = new BinlogEvent
|
||||
{
|
||||
Timestamp = DateTimeOffset.UtcNow,
|
||||
EventType = BinlogEventType.QUERY_EVENT,
|
||||
LogPosition = (uint)position,
|
||||
EventSize = 0,
|
||||
Flags = 0,
|
||||
EventData = Encoding.UTF8.GetBytes($"SHOW MASTER STATUS - File: {file}, Position: {position}"),
|
||||
RawPacket = new byte[0]
|
||||
};
|
||||
|
||||
EventReceived?.Invoke(this, evt);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReadBinlogEventsAsync(string binlogFile, long position)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get the binlog file to read from
|
||||
var targetFile = binlogFile;
|
||||
if (string.IsNullOrEmpty(targetFile))
|
||||
{
|
||||
targetFile = await GetCurrentBinlogFileAsync();
|
||||
}
|
||||
|
||||
LogMessage?.Invoke(this, $"Reading binlog events from {targetFile} starting at position {position}");
|
||||
|
||||
using var command = _connection.CreateCommand();
|
||||
command.CommandText = $"SHOW BINLOG EVENTS IN '{targetFile}' FROM {position}";
|
||||
|
||||
LogMessage?.Invoke(this, $"Executing query: {command.CommandText}");
|
||||
|
||||
using var reader = await command.ExecuteReaderAsync();
|
||||
var eventCount = 0;
|
||||
|
||||
LogMessage?.Invoke(this, "Starting to read binlog events...");
|
||||
|
||||
while (await reader.ReadAsync())
|
||||
{
|
||||
if (_cancellationTokenSource.Token.IsCancellationRequested)
|
||||
break;
|
||||
|
||||
var logName = reader.GetString("Log_name");
|
||||
var logPos = reader.GetInt64("Pos");
|
||||
var eventType = reader.GetString("Event_type");
|
||||
var serverId = reader.GetInt32("Server_id");
|
||||
var endLogPos = reader.GetInt64("End_log_pos");
|
||||
var info = reader.GetString("Info");
|
||||
|
||||
// Only log every 100 events to reduce console output
|
||||
eventCount++;
|
||||
if (eventCount % 100 == 0)
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Processed {eventCount} events...");
|
||||
}
|
||||
|
||||
// Parse event type
|
||||
var binlogEventType = ParseEventType(eventType);
|
||||
|
||||
// Create and store database event
|
||||
var databaseEvent = new DatabaseEvent
|
||||
{
|
||||
Type = "binlog",
|
||||
Table = ExtractTableName(info),
|
||||
Operation = ExtractOperation(eventType, info),
|
||||
Data = info,
|
||||
BinlogPosition = logPos,
|
||||
ServerId = serverId
|
||||
};
|
||||
|
||||
await _eventStore.StoreEventAsync(databaseEvent);
|
||||
|
||||
// Create binlog event for real-time processing
|
||||
var evt = new BinlogEvent
|
||||
{
|
||||
Timestamp = DateTimeOffset.UtcNow,
|
||||
EventType = binlogEventType,
|
||||
LogPosition = (uint)logPos,
|
||||
EventSize = (uint)(endLogPos - logPos),
|
||||
Flags = 0,
|
||||
EventData = Encoding.UTF8.GetBytes(info),
|
||||
RawPacket = new byte[0]
|
||||
};
|
||||
|
||||
EventReceived?.Invoke(this, evt);
|
||||
}
|
||||
|
||||
LogMessage?.Invoke(this, $"Completed reading binlog events. Total events processed: {eventCount}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogMessage?.Invoke(this, $"Error reading binlog events: {ex.Message}");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<string> GetCurrentBinlogFileAsync()
|
||||
{
|
||||
using var command = _connection.CreateCommand();
|
||||
command.CommandText = "SHOW MASTER STATUS";
|
||||
|
||||
using var reader = await command.ExecuteReaderAsync();
|
||||
if (await reader.ReadAsync())
|
||||
{
|
||||
return reader.GetString("File");
|
||||
}
|
||||
|
||||
throw new Exception("Could not determine current binlog file");
|
||||
}
|
||||
|
||||
private string ExtractTableName(string info)
|
||||
{
|
||||
// Try to extract table name from various event types
|
||||
if (info.Contains("table_id:"))
|
||||
{
|
||||
var match = System.Text.RegularExpressions.Regex.Match(info, @"table_id: \d+ \(([^)]+)\)");
|
||||
if (match.Success)
|
||||
{
|
||||
return match.Groups[1].Value;
|
||||
}
|
||||
}
|
||||
|
||||
// For query events, try to extract table name from SQL
|
||||
if (info.Contains("INSERT INTO") || info.Contains("UPDATE") || info.Contains("DELETE FROM"))
|
||||
{
|
||||
var match = System.Text.RegularExpressions.Regex.Match(info, @"(?:INSERT INTO|UPDATE|DELETE FROM)\s+(\w+)");
|
||||
if (match.Success)
|
||||
{
|
||||
return match.Groups[1].Value;
|
||||
}
|
||||
}
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
private string ExtractOperation(string eventType, string info)
|
||||
{
|
||||
return eventType.ToUpper() switch
|
||||
{
|
||||
"WRITE_ROWS_V1" => "insert",
|
||||
"UPDATE_ROWS_V1" => "update",
|
||||
"DELETE_ROWS_V1" => "delete",
|
||||
"QUERY" => "query",
|
||||
"ANNOTATE_ROWS" => "query",
|
||||
_ => eventType.ToLower()
|
||||
};
|
||||
}
|
||||
|
||||
private BinlogEventType ParseEventType(string eventType)
|
||||
{
|
||||
return eventType.ToUpper() switch
|
||||
{
|
||||
"QUERY" => BinlogEventType.QUERY_EVENT,
|
||||
"XID" => BinlogEventType.XID_EVENT,
|
||||
"GTID" => BinlogEventType.GTID_EVENT,
|
||||
"TABLE_MAP" => BinlogEventType.TABLE_MAP_EVENT,
|
||||
"WRITE_ROWS_V1" => BinlogEventType.WRITE_ROWS_EVENT_V1,
|
||||
"UPDATE_ROWS_V1" => BinlogEventType.UPDATE_ROWS_EVENT_V1,
|
||||
"DELETE_ROWS_V1" => BinlogEventType.DELETE_ROWS_EVENT_V1,
|
||||
"ANNOTATE_ROWS" => BinlogEventType.QUERY_EVENT, // Treat as query event
|
||||
"ROTATE" => BinlogEventType.ROTATE_EVENT,
|
||||
"FORMAT_DESCRIPTION" => BinlogEventType.FORMAT_DESCRIPTION_EVENT,
|
||||
_ => BinlogEventType.UNKNOWN_EVENT
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public class BinlogEvent
|
||||
{
|
||||
public DateTimeOffset Timestamp { get; set; }
|
||||
public BinlogEventType EventType { get; set; }
|
||||
public uint LogPosition { get; set; }
|
||||
public uint EventSize { get; set; }
|
||||
public ushort Flags { get; set; }
|
||||
public byte[] EventData { get; set; }
|
||||
public byte[] RawPacket { get; set; }
|
||||
}
|
||||
|
||||
public enum BinlogEventType : byte
|
||||
{
|
||||
UNKNOWN_EVENT = 0x00,
|
||||
START_EVENT_V3 = 0x01,
|
||||
QUERY_EVENT = 0x02,
|
||||
STOP_EVENT = 0x03,
|
||||
ROTATE_EVENT = 0x04,
|
||||
INTVAR_EVENT = 0x05,
|
||||
LOAD_EVENT = 0x06,
|
||||
SLAVE_EVENT = 0x07,
|
||||
CREATE_FILE_EVENT = 0x08,
|
||||
APPEND_BLOCK_EVENT = 0x09,
|
||||
EXEC_LOAD_EVENT = 0x0A,
|
||||
DELETE_FILE_EVENT = 0x0B,
|
||||
NEW_LOAD_EVENT = 0x0C,
|
||||
RAND_EVENT = 0x0D,
|
||||
USER_VAR_EVENT = 0x0E,
|
||||
FORMAT_DESCRIPTION_EVENT = 0x0F,
|
||||
XID_EVENT = 0x10,
|
||||
BEGIN_LOAD_QUERY_EVENT = 0x11,
|
||||
EXECUTE_LOAD_QUERY_EVENT = 0x12,
|
||||
TABLE_MAP_EVENT = 0x13,
|
||||
WRITE_ROWS_EVENT_V0 = 0x14,
|
||||
UPDATE_ROWS_EVENT_V0 = 0x15,
|
||||
DELETE_ROWS_EVENT_V0 = 0x16,
|
||||
WRITE_ROWS_EVENT_V1 = 0x17,
|
||||
UPDATE_ROWS_EVENT_V1 = 0x18,
|
||||
DELETE_ROWS_EVENT_V1 = 0x19,
|
||||
INCIDENT_EVENT = 0x1A,
|
||||
HEARTBEAT_EVENT = 0x1B,
|
||||
IGNORABLE_EVENT = 0x1C,
|
||||
ROWS_QUERY_EVENT = 0x1D,
|
||||
WRITE_ROWS_EVENT_V2 = 0x1E,
|
||||
UPDATE_ROWS_EVENT_V2 = 0x1F,
|
||||
DELETE_ROWS_EVENT_V2 = 0x20,
|
||||
GTID_EVENT = 0x21,
|
||||
ANONYMOUS_GTID_EVENT = 0x22,
|
||||
PREVIOUS_GTIDS_EVENT = 0x23
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user