using CommandLine; using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; using RedisManager.Utils; namespace RedisManager.Commands { [Verb("xadd", HelpText = "Add an entry to a stream.")] public class XAddOptions { [Option('i', "instance", Required = true, HelpText = "Instance name.")] public string Instance { get; set; } [Value(0, MetaName = "stream", Required = true, HelpText = "Stream key.")] public string Stream { get; set; } [Option("id", Required = false, HelpText = "Entry ID (default: *)")] public string Id { get; set; } = "*"; [Option("fields", Required = true, HelpText = "Field-value pairs as field1:value1,field2:value2...")] public IEnumerable Fields { get; set; } [Option("table", Required = false, HelpText = "Output as table.")] public bool Table { get; set; } } [Verb("xrange", HelpText = "Get entries from a stream within a range.")] public class XRangeOptions { [Option('i', "instance", Required = true, HelpText = "Instance name.")] public string Instance { get; set; } [Value(0, MetaName = "stream", Required = true, HelpText = "Stream key.")] public string Stream { get; set; } [Option("start", Required = false, HelpText = "Start ID (default: -)")] public string Start { get; set; } = "-"; [Option("end", Required = false, HelpText = "End ID (default: +)")] public string End { get; set; } = "+"; [Option("count", Required = false, HelpText = "Maximum number of entries")] public int? Count { get; set; } [Option("table", Required = false, HelpText = "Output as table.")] public bool Table { get; set; } } [Verb("xlen", HelpText = "Get the length of a stream.")] public class XLenOptions { [Option('i', "instance", Required = true, HelpText = "Instance name.")] public string Instance { get; set; } [Value(0, MetaName = "stream", Required = true, HelpText = "Stream key.")] public string Stream { get; set; } [Option("table", Required = false, HelpText = "Output as table.")] public bool Table { get; set; } } [Verb("xdel", HelpText = "Remove entries from a stream.")] public class XDelOptions { [Option('i', "instance", Required = true, HelpText = "Instance name.")] public string Instance { get; set; } [Value(0, MetaName = "stream", Required = true, HelpText = "Stream key.")] public string Stream { get; set; } [Value(1, MetaName = "ids", Min = 1, Required = true, HelpText = "Entry IDs to delete.")] public IEnumerable Ids { get; set; } [Option("table", Required = false, HelpText = "Output as table.")] public bool Table { get; set; } } public static class StreamCommands { public static int RunXAdd(XAddOptions opts, Config config) { var instance = RedisUtils.GetInstance(config, opts.Instance); using var redis = RedisUtils.ConnectRedis(instance); var db = redis.GetDatabase(); // Use XADD command directly var args = new List { opts.Stream, opts.Id }; foreach (var field in opts.Fields) { var parts = field.Split(':', 2); if (parts.Length == 2) { args.Add(parts[0]); args.Add(parts[1]); } } var id = db.Execute("XADD", args.ToArray()); if (opts.Table) RedisUtils.PrintTable(new[] { "Stream", "ID", "Fields" }, new List { new[] { opts.Stream, id.ToString(), string.Join(", ", opts.Fields) } }); else Console.WriteLine(Output.Green(id.ToString())); return 0; } public static int RunXRange(XRangeOptions opts, Config config) { var instance = RedisUtils.GetInstance(config, opts.Instance); using var redis = RedisUtils.ConnectRedis(instance); var db = redis.GetDatabase(); // Use XRANGE command directly var args = new List { opts.Stream, opts.Start, opts.End }; if (opts.Count.HasValue) { args.Add("COUNT"); args.Add(opts.Count.Value); } var result = db.Execute("XRANGE", args.ToArray()); if (opts.Table) { Console.WriteLine(Output.Yellow("Table output not supported for XRANGE yet")); } else Console.WriteLine(Output.Green(result.ToString())); return 0; } public static int RunXLen(XLenOptions opts, Config config) { var instance = RedisUtils.GetInstance(config, opts.Instance); using var redis = RedisUtils.ConnectRedis(instance); var db = redis.GetDatabase(); // Use XLEN command directly var length = db.Execute("XLEN", opts.Stream); if (opts.Table) RedisUtils.PrintTable(new[] { "Stream", "Length" }, new List { new[] { opts.Stream, length.ToString() } }); else Console.WriteLine(Output.Green(length.ToString())); return 0; } public static int RunXDel(XDelOptions opts, Config config) { var instance = RedisUtils.GetInstance(config, opts.Instance); using var redis = RedisUtils.ConnectRedis(instance); var db = redis.GetDatabase(); // Use XDEL command directly var args = new List { opts.Stream }; args.AddRange(opts.Ids.Select(id => (RedisValue)id)); var count = db.Execute("XDEL", args.ToArray()); if (opts.Table) RedisUtils.PrintTable(new[] { "Stream", "Deleted Count" }, new List { new[] { opts.Stream, count.ToString() } }); else Console.WriteLine(Output.Green($"Deleted {count} entry(ies)")); return 0; } } }