Files
DatabaseSnapshots/Services/EventStore.cs
GuilhermeStrice 1108bf3ef6 add project
2025-07-09 19:24:12 +01:00

140 lines
5.1 KiB
C#

using System.Text;
using System.Text.Json;
using DatabaseSnapshotsService.Models;
namespace DatabaseSnapshotsService.Services
{
public class EventStore
{
private readonly EventStoreConfig _config;
private readonly string _eventsPath;
private readonly string _indexPath;
private readonly object _writeLock = new object();
private long _currentEventId = 0;
private string _currentEventFile = string.Empty;
private StreamWriter? _currentWriter;
private long _currentFileSize = 0;
public EventStore(EventStoreConfig config)
{
_config = config;
_eventsPath = Path.GetFullPath(config.Path);
_indexPath = Path.Combine(_eventsPath, "index");
// Ensure directories exist
Directory.CreateDirectory(_eventsPath);
Directory.CreateDirectory(_indexPath);
// Load next event ID and initialize current file
LoadNextEventId();
InitializeCurrentFile();
}
public async Task<long> StoreEventAsync(DatabaseEvent evt)
{
lock (_writeLock)
{
evt.Id = ++_currentEventId;
evt.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
evt.Checksum = CalculateEventChecksum(evt);
// Check if we need to rotate the file
if (_currentFileSize > _config.MaxFileSize || _currentWriter == null)
{
RotateEventFile();
}
// Write event to current file
var json = JsonSerializer.Serialize(evt);
_currentWriter?.WriteLine(json);
_currentWriter?.Flush();
_currentFileSize += json.Length + Environment.NewLine.Length;
// Update index
UpdateEventIndex(evt);
return evt.Id;
}
}
public async Task<long> GetLastEventIdAsync()
{
var eventFiles = Directory.GetFiles(_eventsPath, "events_*.json");
long lastId = 0;
foreach (var file in eventFiles.OrderByDescending(f => f))
{
var lines = await File.ReadAllLinesAsync(file);
if (lines.Length > 0)
{
var lastLine = lines.Last();
try
{
var lastEvent = JsonSerializer.Deserialize<DatabaseEvent>(lastLine);
if (lastEvent != null && lastEvent.Id > lastId)
{
lastId = lastEvent.Id;
}
}
catch (JsonException)
{
// Skip malformed JSON
continue;
}
}
}
return lastId;
}
private void LoadNextEventId()
{
var lastId = GetLastEventIdAsync().GetAwaiter().GetResult();
_currentEventId = lastId;
}
private void InitializeCurrentFile()
{
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
_currentEventFile = Path.Combine(_eventsPath, $"events_{timestamp}.json");
_currentWriter = new StreamWriter(_currentEventFile, true, Encoding.UTF8);
_currentFileSize = 0;
}
private void RotateEventFile()
{
// Close current writer and delete file if empty
if (_currentWriter != null)
{
_currentWriter.Close();
if (!string.IsNullOrEmpty(_currentEventFile) && File.Exists(_currentEventFile))
{
var fileInfo = new FileInfo(_currentEventFile);
if (fileInfo.Length == 0)
{
File.Delete(_currentEventFile);
}
}
}
_currentEventFile = Path.Combine(_eventsPath, $"events_{DateTime.UtcNow:yyyyMMdd_HHmmss}_{_currentEventId}.json");
_currentWriter = new StreamWriter(_currentEventFile, append: false, Encoding.UTF8);
_currentFileSize = 0;
}
private string CalculateEventChecksum(DatabaseEvent evt)
{
var data = $"{evt.Id}{evt.Timestamp}{evt.Type}{evt.Table}{evt.Operation}{evt.Data}{evt.BinlogPosition}{evt.ServerId}";
using var sha256 = System.Security.Cryptography.SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(data));
return Convert.ToHexString(hash).ToLower();
}
private void UpdateEventIndex(DatabaseEvent evt)
{
var indexFile = Path.Combine(_indexPath, Path.GetFileNameWithoutExtension(_currentEventFile) + ".idx");
var indexEntry = $"{evt.Id},{evt.Timestamp},{evt.Table},{evt.Operation}";
File.AppendAllText(indexFile, indexEntry + Environment.NewLine);
}
}
}