using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.Http; using System.Text; using System.Text.Json; using System.Threading.Tasks; using FluentAssertions; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.TestHost; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using SqrtSpace.SpaceTime.AspNetCore; using SqrtSpace.SpaceTime.Collections; using SqrtSpace.SpaceTime.Core; using SqrtSpace.SpaceTime.EntityFramework; using SqrtSpace.SpaceTime.Linq; using Xunit; namespace SqrtSpace.SpaceTime.Tests.Integration; public class EndToEndScenarioTests : IDisposable { private readonly TestServer _server; private readonly HttpClient _client; private readonly string _dataDirectory; public EndToEndScenarioTests() { _dataDirectory = Path.Combine(Path.GetTempPath(), "spacetime_e2e_tests", Guid.NewGuid().ToString()); Directory.CreateDirectory(_dataDirectory); var builder = new WebHostBuilder() .ConfigureServices(services => { // Configure SpaceTime services.AddSpaceTime(options => { options.EnableCheckpointing = true; options.EnableStreaming = true; options.CheckpointDirectory = Path.Combine(_dataDirectory, "checkpoints"); options.ExternalStorageDirectory = Path.Combine(_dataDirectory, "external"); options.DefaultStrategy = SpaceTimeStrategy.SqrtN; }); // Configure Entity Framework with SpaceTime services.AddDbContext(options => { options.UseInMemoryDatabase($"TestDb_{Guid.NewGuid()}"); options.UseSpaceTimeOptimizer(opt => { opt.EnableSqrtNChangeTracking = true; opt.EnableQueryCheckpointing = true; opt.BufferPoolStrategy = BufferPoolStrategy.SqrtN; }); }); services.AddControllers(); services.AddScoped(); }) .Configure(app => { app.UseSpaceTime(); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }); _server = new TestServer(builder); _client = _server.CreateClient(); // Seed initial data SeedDatabase().Wait(); } public void Dispose() { _client?.Dispose(); _server?.Dispose(); if (Directory.Exists(_dataDirectory)) { Directory.Delete(_dataDirectory, true); } } [Fact] public async Task CompleteDataImportWorkflow_WithCheckpointing() { // Arrange var importData = new ImportRequest { FileName = "large_dataset.csv", TotalRecords = 10000, SimulateFailureAt = 5000 }; // Act - First attempt (will fail) var response1 = await _client.PostAsync("/api/import/csv", new StringContent(JsonSerializer.Serialize(importData), Encoding.UTF8, "application/json")); response1.StatusCode.Should().Be(System.Net.HttpStatusCode.InternalServerError); // Act - Retry (should resume from checkpoint) importData.SimulateFailureAt = null; var response2 = await _client.PostAsync("/api/import/csv", new StringContent(JsonSerializer.Serialize(importData), Encoding.UTF8, "application/json")); // Assert response2.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var result = JsonSerializer.Deserialize( await response2.Content.ReadAsStringAsync()); result!.TotalProcessed.Should().Be(10000); result.ResumedFromCheckpoint.Should().BeTrue(); result.ProcessingTime.Should().BeGreaterThan(TimeSpan.Zero); } [Fact] public async Task LargeDataExport_WithStreaming() { // Act var response = await _client.GetStreamAsync("/api/export/orders?count=5000"); // Read streamed data var orders = new List(); using var reader = new StreamReader(response); string? line; while ((line = await reader.ReadLineAsync()) != null) { if (!string.IsNullOrWhiteSpace(line) && line.StartsWith("{")) { var order = JsonSerializer.Deserialize(line); if (order != null) orders.Add(order); } } // Assert orders.Should().HaveCount(5000); orders.Should().BeInAscendingOrder(o => o.OrderDate); orders.All(o => o.TotalAmount > 0).Should().BeTrue(); } [Fact] public async Task ComplexAnalytics_WithMemoryOptimization() { // Act var response = await _client.GetAsync("/api/analytics/sales-summary?startDate=2023-01-01&endDate=2023-12-31"); // Assert response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var result = JsonSerializer.Deserialize( await response.Content.ReadAsStringAsync()); result!.TotalRevenue.Should().BeGreaterThan(0); result.OrderCount.Should().BeGreaterThan(0); result.TopProducts.Should().HaveCount(10); result.MonthlySales.Should().HaveCount(12); result.ProcessingStats.MemoryUsedMB.Should().BeLessThan(50); // Should use < 50MB even for large dataset } [Fact] public async Task BatchProcessing_WithAdaptiveCollections() { // Arrange var batchRequest = new BatchProcessRequest { Operations = Enumerable.Range(1, 1000).Select(i => new Operation { Type = i % 3 == 0 ? "Update" : "Create", Data = new { Id = i, Value = $"Item{i}" } }).ToList() }; // Act var response = await _client.PostAsync("/api/batch/process", new StringContent(JsonSerializer.Serialize(batchRequest), Encoding.UTF8, "application/json")); // Assert response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var result = JsonSerializer.Deserialize( await response.Content.ReadAsStringAsync()); result!.ProcessedCount.Should().Be(1000); result.Errors.Should().BeEmpty(); result.MemoryStats.PeakUsageMB.Should().BeLessThan(20); // Adaptive collections should minimize memory } [Fact] public async Task RealtimeDataProcessing_WithBackpressure() { // Arrange var processingTasks = new List>(); // Act - Send multiple concurrent requests for (int i = 0; i < 5; i++) { var task = _client.PostAsync($"/api/realtime/process?streamId={i}", new StringContent(JsonSerializer.Serialize(new { DataPoints = 1000 }))); processingTasks.Add(task); } var responses = await Task.WhenAll(processingTasks); // Assert responses.Should().AllSatisfy(r => r.StatusCode.Should().Be(System.Net.HttpStatusCode.OK)); // Verify backpressure worked var processingTimes = new List(); foreach (var response in responses) { var result = JsonSerializer.Deserialize( await response.Content.ReadAsStringAsync()); processingTimes.Add(result!.Duration); } // Processing times should vary due to backpressure processingTimes.Max().Subtract(processingTimes.Min()).TotalMilliseconds.Should().BeGreaterThan(100); } [Fact] public async Task DataMigration_WithCheckpointRecovery() { // Arrange var migrationId = Guid.NewGuid().ToString(); // Act - Start migration var startResponse = await _client.PostAsync($"/api/migration/start?id={migrationId}", new StringContent(JsonSerializer.Serialize(new { SourceTable = "LegacyData", RecordCount = 50000 }))); startResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.Accepted); // Check status periodically MigrationStatus? status = null; for (int i = 0; i < 10; i++) { await Task.Delay(500); var statusResponse = await _client.GetAsync($"/api/migration/status?id={migrationId}"); status = JsonSerializer.Deserialize( await statusResponse.Content.ReadAsStringAsync()); if (status!.IsComplete) break; } // Assert status.Should().NotBeNull(); status!.IsComplete.Should().BeTrue(); status.RecordsProcessed.Should().Be(50000); status.CheckpointsSaved.Should().BeGreaterThan(0); } [Fact] public async Task FullTextSearch_WithExternalIndex() { // Arrange - Index documents var documents = Enumerable.Range(1, 1000).Select(i => new Document { Id = i, Title = $"Document {i}", Content = GenerateLargeText(i), Tags = GenerateTags(i) }).ToList(); var indexResponse = await _client.PostAsync("/api/search/index", new StringContent(JsonSerializer.Serialize(documents), Encoding.UTF8, "application/json")); indexResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); // Act - Search var searchResponse = await _client.GetAsync("/api/search?query=important&limit=10"); // Assert searchResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var results = JsonSerializer.Deserialize( await searchResponse.Content.ReadAsStringAsync()); results!.Items.Should().HaveCount(10); results.TotalMatches.Should().BeGreaterThan(10); results.SearchTime.Should().BeLessThan(TimeSpan.FromSeconds(1)); results.MemoryUsedMB.Should().BeLessThan(10); // External index should use minimal memory } private async Task SeedDatabase() { using var scope = _server.Services.CreateScope(); var context = scope.ServiceProvider.GetRequiredService(); // Create test data var customers = Enumerable.Range(1, 100).Select(i => new Customer { Name = $"Customer {i}", Email = $"customer{i}@example.com", CreatedDate = DateTime.Today.AddDays(-Random.Shared.Next(365)) }).ToList(); var products = Enumerable.Range(1, 50).Select(i => new Product { Name = $"Product {i}", Price = Random.Shared.Next(10, 1000), Category = $"Category{i % 5}" }).ToList(); context.Customers.AddRange(customers); context.Products.AddRange(products); await context.SaveChangesAsync(); // Generate orders var orders = new List(); foreach (var customer in customers.Take(50)) { for (int i = 0; i < Random.Shared.Next(5, 20); i++) { var order = new Order { CustomerId = customer.Id, OrderDate = DateTime.Today.AddDays(-Random.Shared.Next(365)), Status = "Completed", Items = new List() }; var itemCount = Random.Shared.Next(1, 10); for (int j = 0; j < itemCount; j++) { var product = products[Random.Shared.Next(products.Count)]; order.Items.Add(new OrderItem { ProductId = product.Id, Quantity = Random.Shared.Next(1, 5), UnitPrice = product.Price }); } order.TotalAmount = order.Items.Sum(i => i.Quantity * i.UnitPrice); orders.Add(order); } } context.Orders.AddRange(orders); await context.SaveChangesAsync(); } private static string GenerateLargeText(int seed) { var words = new[] { "important", "critical", "data", "analysis", "report", "summary", "detail", "information" }; var sb = new StringBuilder(); var random = new Random(seed); for (int i = 0; i < 100; i++) { sb.Append(words[random.Next(words.Length)]); sb.Append(' '); } return sb.ToString(); } private static List GenerateTags(int seed) { var allTags = new[] { "urgent", "review", "approved", "pending", "archived" }; var random = new Random(seed); var tagCount = random.Next(1, 4); return allTags.OrderBy(_ => random.Next()).Take(tagCount).ToList(); } } // Controllers for integration tests [ApiController] [Route("api/import")] public class ImportController : ControllerBase { private readonly IDataProcessingService _processingService; public ImportController(IDataProcessingService processingService) { _processingService = processingService; } [HttpPost("csv")] [EnableCheckpoint(Strategy = CheckpointStrategy.SqrtN)] public async Task ImportCsv([FromBody] ImportRequest request) { var checkpoint = HttpContext.Features.Get()!; var state = await checkpoint.LoadStateAsync("import-state"); var startFrom = state?.ProcessedCount ?? 0; var processedCount = startFrom; var resumed = startFrom > 0; var sw = System.Diagnostics.Stopwatch.StartNew(); try { for (int i = startFrom; i < request.TotalRecords; i++) { if (request.SimulateFailureAt.HasValue && i == request.SimulateFailureAt.Value) { throw new Exception("Simulated import failure"); } // Simulate processing await _processingService.ProcessRecord(i); processedCount++; if (checkpoint.ShouldCheckpoint(processedCount)) { await checkpoint.SaveStateAsync("import-state", new ImportState { ProcessedCount = processedCount }); } } } catch { throw; } return Ok(new ImportResult { TotalProcessed = processedCount, ResumedFromCheckpoint = resumed, ProcessingTime = sw.Elapsed }); } } [ApiController] [Route("api/export")] public class ExportController : ControllerBase { private readonly TestDbContext _context; public ExportController(TestDbContext context) { _context = context; } [HttpGet("orders")] [SpaceTimeStreaming(ChunkStrategy = ChunkStrategy.SqrtN)] public async IAsyncEnumerable ExportOrders([FromQuery] int count = 1000) { await foreach (var batch in _context.Orders .OrderBy(o => o.OrderDate) .Take(count) .BatchBySqrtNAsync()) { foreach (var order in batch) { yield return new OrderExport { OrderId = order.Id, CustomerName = order.Customer.Name, OrderDate = order.OrderDate, TotalAmount = order.TotalAmount, ItemCount = order.Items.Count }; } } } } [ApiController] [Route("api/analytics")] public class AnalyticsController : ControllerBase { private readonly TestDbContext _context; public AnalyticsController(TestDbContext context) { _context = context; } [HttpGet("sales-summary")] public async Task GetSalesSummary([FromQuery] DateTime startDate, [FromQuery] DateTime endDate) { var memoryBefore = GC.GetTotalMemory(false); // Use SpaceTime optimizations for large aggregations var orders = await _context.Orders .Where(o => o.OrderDate >= startDate && o.OrderDate <= endDate) .ToListWithSqrtNMemoryAsync(); var summary = new SalesSummary { TotalRevenue = orders.Sum(o => o.TotalAmount), OrderCount = orders.Count, AverageOrderValue = orders.Average(o => o.TotalAmount), TopProducts = await GetTopProducts(orders), MonthlySales = GetMonthlySales(orders), ProcessingStats = new ProcessingStats { MemoryUsedMB = (GC.GetTotalMemory(false) - memoryBefore) / (1024.0 * 1024.0), RecordsProcessed = orders.Count } }; return Ok(summary); } private async Task> GetTopProducts(List orders) { var productSales = new AdaptiveDictionary(); foreach (var order in orders) { foreach (var item in order.Items) { if (productSales.ContainsKey(item.ProductId)) productSales[item.ProductId] += item.Quantity * item.UnitPrice; else productSales[item.ProductId] = item.Quantity * item.UnitPrice; } } return productSales .OrderByDescending(kvp => kvp.Value) .Take(10) .Select(kvp => new ProductSummary { ProductId = kvp.Key, TotalSales = kvp.Value }) .ToList(); } private List GetMonthlySales(List orders) { return orders .GroupByExternal(o => new { o.OrderDate.Year, o.OrderDate.Month }) .Select(g => new MonthlySale { Year = g.Key.Year, Month = g.Key.Month, Total = g.Sum(o => o.TotalAmount) }) .OrderBy(m => m.Year) .ThenBy(m => m.Month) .ToList(); } } [ApiController] [Route("api/batch")] public class BatchController : ControllerBase { [HttpPost("process")] public async Task ProcessBatch([FromBody] BatchProcessRequest request) { var results = new AdaptiveList(); var errors = new List(); var memoryStart = GC.GetTotalMemory(false); var peakMemory = memoryStart; foreach (var batch in request.Operations.BatchBySqrtN()) { foreach (var operation in batch) { try { var result = await ProcessOperation(operation); results.Add(result); } catch (Exception ex) { errors.Add($"Operation failed: {ex.Message}"); } } var currentMemory = GC.GetTotalMemory(false); if (currentMemory > peakMemory) peakMemory = currentMemory; } return Ok(new BatchProcessResult { ProcessedCount = results.Count, Errors = errors, MemoryStats = new MemoryStats { StartUsageMB = memoryStart / (1024.0 * 1024.0), PeakUsageMB = peakMemory / (1024.0 * 1024.0) } }); } private async Task ProcessOperation(Operation operation) { await Task.Delay(1); // Simulate processing return new { Success = true, Id = Guid.NewGuid() }; } } [ApiController] [Route("api/realtime")] public class RealtimeController : ControllerBase { private static readonly AdaptiveDictionary _streamTimestamps = new(); [HttpPost("process")] public async Task ProcessStream([FromQuery] string streamId, [FromBody] StreamData data) { var start = DateTime.UtcNow; // Apply backpressure based on stream rate if (_streamTimestamps.TryGetValue(streamId, out var lastTime)) { var elapsed = (DateTime.UtcNow - lastTime).TotalMilliseconds; if (elapsed < 100) // Less than 100ms since last request { await Task.Delay(TimeSpan.FromMilliseconds(100 - elapsed)); } } _streamTimestamps[streamId] = DateTime.UtcNow; // Process data points var processed = 0; foreach (var point in Enumerable.Range(1, data.DataPoints)) { await ProcessDataPoint(point); processed++; } return Ok(new ProcessingResult { ProcessedCount = processed, Duration = DateTime.UtcNow - start }); } private async Task ProcessDataPoint(int point) { await Task.Delay(1); } } [ApiController] [Route("api/migration")] public class MigrationController : ControllerBase { private static readonly AdaptiveDictionary _migrations = new(); [HttpPost("start")] public async Task StartMigration([FromQuery] string id, [FromBody] MigrationRequest request) { var status = new MigrationStatus { Id = id, TotalRecords = request.RecordCount, StartTime = DateTime.UtcNow }; _migrations[id] = status; // Start migration in background _ = Task.Run(async () => await RunMigration(id, request)); return Accepted(new { MigrationId = id }); } [HttpGet("status")] public IActionResult GetStatus([FromQuery] string id) { if (_migrations.TryGetValue(id, out var status)) { return Ok(status); } return NotFound(); } private async Task RunMigration(string id, MigrationRequest request) { var checkpointManager = new CheckpointManager( Path.Combine(Path.GetTempPath(), "migrations", id), strategy: CheckpointStrategy.SqrtN); var status = _migrations[id]; for (int i = 0; i < request.RecordCount; i++) { // Simulate migration work await Task.Delay(1); status.RecordsProcessed++; if (checkpointManager.ShouldCheckpoint()) { await checkpointManager.CreateCheckpointAsync(new { Processed = i }); status.CheckpointsSaved++; } } status.IsComplete = true; status.EndTime = DateTime.UtcNow; } } [ApiController] [Route("api/search")] public class SearchController : ControllerBase { private static readonly ExternalStorage _documentStorage = new(Path.Combine(Path.GetTempPath(), "search_index")); private static readonly AdaptiveDictionary> _invertedIndex = new(); [HttpPost("index")] public async Task IndexDocuments([FromBody] List documents) { foreach (var doc in documents) { // Store document await _documentStorage.WriteAsync($"doc_{doc.Id}", doc); // Build inverted index var words = doc.Content.Split(' ', StringSplitOptions.RemoveEmptyEntries) .Concat(doc.Title.Split(' ')) .Concat(doc.Tags) .Distinct(); foreach (var word in words) { var key = word.ToLowerInvariant(); if (!_invertedIndex.ContainsKey(key)) _invertedIndex[key] = new List(); _invertedIndex[key].Add(doc.Id); } } return Ok(new { IndexedCount = documents.Count }); } [HttpGet] public async Task Search([FromQuery] string query, [FromQuery] int limit = 10) { var start = DateTime.UtcNow; var memoryBefore = GC.GetTotalMemory(false); var searchTerms = query.ToLowerInvariant().Split(' '); var matchingIds = new AdaptiveList(); foreach (var term in searchTerms) { if (_invertedIndex.TryGetValue(term, out var ids)) { matchingIds.AddRange(ids); } } var uniqueIds = matchingIds.DistinctExternal().Take(limit).ToList(); var results = new List(); foreach (var id in uniqueIds) { var doc = await _documentStorage.ReadAsync($"doc_{id}"); if (doc != null) results.Add(doc); } return Ok(new SearchResults { Items = results, TotalMatches = matchingIds.Count, SearchTime = DateTime.UtcNow - start, MemoryUsedMB = (GC.GetTotalMemory(false) - memoryBefore) / (1024.0 * 1024.0) }); } } // Service interfaces and implementations public interface IDataProcessingService { Task ProcessRecord(int recordId); } public class DataProcessingService : IDataProcessingService { public async Task ProcessRecord(int recordId) { // Simulate processing await Task.Delay(1); } } // DTOs public class ImportRequest { public string FileName { get; set; } = ""; public int TotalRecords { get; set; } public int? SimulateFailureAt { get; set; } } public class ImportResult { public int TotalProcessed { get; set; } public bool ResumedFromCheckpoint { get; set; } public TimeSpan ProcessingTime { get; set; } } public class ImportState { public int ProcessedCount { get; set; } } public class OrderExport { public int OrderId { get; set; } public string CustomerName { get; set; } = ""; public DateTime OrderDate { get; set; } public decimal TotalAmount { get; set; } public int ItemCount { get; set; } } public class SalesSummary { public decimal TotalRevenue { get; set; } public int OrderCount { get; set; } public decimal AverageOrderValue { get; set; } public List TopProducts { get; set; } = new(); public List MonthlySales { get; set; } = new(); public ProcessingStats ProcessingStats { get; set; } = new(); } public class ProductSummary { public int ProductId { get; set; } public decimal TotalSales { get; set; } } public class MonthlySale { public int Year { get; set; } public int Month { get; set; } public decimal Total { get; set; } } public class ProcessingStats { public double MemoryUsedMB { get; set; } public int RecordsProcessed { get; set; } } public class BatchProcessRequest { public List Operations { get; set; } = new(); } public class Operation { public string Type { get; set; } = ""; public object Data { get; set; } = new(); } public class BatchProcessResult { public int ProcessedCount { get; set; } public List Errors { get; set; } = new(); public MemoryStats MemoryStats { get; set; } = new(); } public class MemoryStats { public double StartUsageMB { get; set; } public double PeakUsageMB { get; set; } } public class StreamData { public int DataPoints { get; set; } } public class ProcessingResult { public int ProcessedCount { get; set; } public TimeSpan Duration { get; set; } } public class MigrationRequest { public string SourceTable { get; set; } = ""; public int RecordCount { get; set; } } public class MigrationStatus { public string Id { get; set; } = ""; public int TotalRecords { get; set; } public int RecordsProcessed { get; set; } public int CheckpointsSaved { get; set; } public bool IsComplete { get; set; } public DateTime StartTime { get; set; } public DateTime? EndTime { get; set; } } public class Document { public int Id { get; set; } public string Title { get; set; } = ""; public string Content { get; set; } = ""; public List Tags { get; set; } = new(); } public class SearchResults { public List Items { get; set; } = new(); public int TotalMatches { get; set; } public TimeSpan SearchTime { get; set; } public double MemoryUsedMB { get; set; } } // Test DB Context public class TestDbContext : DbContext { public TestDbContext(DbContextOptions options) : base(options) { } public DbSet Customers { get; set; } public DbSet Products { get; set; } public DbSet Orders { get; set; } public DbSet OrderItems { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity() .HasOne(o => o.Customer) .WithMany() .HasForeignKey(o => o.CustomerId); modelBuilder.Entity() .HasOne(oi => oi.Product) .WithMany() .HasForeignKey(oi => oi.ProductId); } } public class Customer { public int Id { get; set; } public string Name { get; set; } = ""; public string Email { get; set; } = ""; public DateTime CreatedDate { get; set; } } public class Product { public int Id { get; set; } public string Name { get; set; } = ""; public decimal Price { get; set; } public string Category { get; set; } = ""; } public class Order { public int Id { get; set; } public int CustomerId { get; set; } public Customer Customer { get; set; } = null!; public DateTime OrderDate { get; set; } public decimal TotalAmount { get; set; } public string Status { get; set; } = ""; public List Items { get; set; } = new(); } public class OrderItem { public int Id { get; set; } public int OrderId { get; set; } public int ProductId { get; set; } public Product Product { get; set; } = null!; public int Quantity { get; set; } public decimal UnitPrice { get; set; } }