Files

506 lines
15 KiB
C#
Raw Permalink Normal View History

2025-07-20 03:41:39 -04:00
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.DependencyInjection;
using SqrtSpace.SpaceTime.AspNetCore;
using Xunit;
namespace SqrtSpace.SpaceTime.Tests.AspNetCore;
public class StreamingMiddlewareTests : IDisposable
{
private readonly TestServer _server;
private readonly HttpClient _client;
public StreamingMiddlewareTests()
{
var builder = new WebHostBuilder()
.ConfigureServices(services =>
{
services.AddSpaceTime(options =>
{
options.EnableStreaming = true;
options.DefaultChunkSize = 10;
options.StreamingBufferSize = 1024;
});
services.AddControllers();
})
.Configure(app =>
{
app.UseSpaceTime();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
});
_server = new TestServer(builder);
_client = _server.CreateClient();
}
public void Dispose()
{
_client?.Dispose();
_server?.Dispose();
}
[Fact]
public async Task StreamingResponse_ChunksData()
{
// Act
var response = await _client.GetAsync("/api/stream/items?count=100", HttpCompletionOption.ResponseHeadersRead);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
response.Headers.TransferEncodingChunked.Should().BeTrue();
// Read chunks
var chunks = new List<string>();
using var stream = await response.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
if (!string.IsNullOrWhiteSpace(line))
chunks.Add(line);
}
chunks.Should().HaveCountGreaterThan(1);
}
[Fact]
public async Task SpaceTimeStreaming_WithSqrtNStrategy_OptimalChunking()
{
// Act
var response = await _client.GetStreamAsync("/api/stream/sqrt-chunked?count=100");
var items = new List<TestItem>();
using var reader = new StreamReader(response);
string? chunk;
while ((chunk = await reader.ReadLineAsync()) != null)
{
if (!string.IsNullOrWhiteSpace(chunk) && chunk.StartsWith("["))
{
var chunkItems = JsonSerializer.Deserialize<List<TestItem>>(chunk);
if (chunkItems != null)
items.AddRange(chunkItems);
}
}
// Assert
items.Should().HaveCount(100);
// With sqrt(100) = 10, we should have received ~10 chunks
}
[Fact]
public async Task StreamingResponse_HandlesLargeDataset()
{
// Act
var response = await _client.GetAsync("/api/stream/large?count=10000", HttpCompletionOption.ResponseHeadersRead);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var itemCount = 0;
using var stream = await response.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
if (line.Contains("\"id\":"))
itemCount++;
}
itemCount.Should().Be(10000);
}
[Fact]
public async Task StreamingResponse_WithBackpressure_ThrottlesCorrectly()
{
// Act
var response = await _client.GetStreamAsync("/api/stream/backpressure?count=50");
var receiveTimes = new List<DateTime>();
using var reader = new StreamReader(response);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
if (!string.IsNullOrWhiteSpace(line))
{
receiveTimes.Add(DateTime.UtcNow);
await Task.Delay(50); // Simulate slow client
}
}
// Assert
receiveTimes.Should().HaveCount(50);
// Verify throttling worked (items should be spread over time)
var duration = receiveTimes.Last() - receiveTimes.First();
duration.TotalMilliseconds.Should().BeGreaterThan(1000);
}
[Fact]
public async Task StreamingResponse_ClientDisconnect_CleansUpResources()
{
// Arrange
using var cts = new System.Threading.CancellationTokenSource();
// Act
var request = new HttpRequestMessage(HttpMethod.Get, "/api/stream/cancellable?count=1000");
var sendTask = _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token);
// Cancel after receiving headers
var response = await sendTask;
cts.Cancel();
// Try to read - should fail gracefully
try
{
using var stream = await response.Content.ReadAsStreamAsync();
using var reader = new StreamReader(stream);
await reader.ReadToEndAsync();
}
catch (OperationCanceledException)
{
// Expected
}
// Assert - server should handle cancellation gracefully
response.StatusCode.Should().Be(HttpStatusCode.OK);
}
[Fact]
public async Task StreamingWithCompression_CompressesChunks()
{
// Arrange
_client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
// Act
var response = await _client.GetAsync("/api/stream/compressed?count=100");
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
response.Content.Headers.ContentEncoding.Should().Contain("gzip");
// Content should be readable (HttpClient handles decompression)
var content = await response.Content.ReadAsStringAsync();
content.Should().Contain("\"id\":");
}
[Fact]
public async Task MixedContent_StreamsJsonAndBinary()
{
// Act
var response = await _client.GetStreamAsync("/api/stream/mixed");
using var reader = new BinaryReader(response);
var results = new List<object>();
try
{
while (true)
{
var type = reader.ReadByte(); // 0 = JSON, 1 = Binary
var length = reader.ReadInt32();
var data = reader.ReadBytes(length);
if (type == 0)
{
var json = Encoding.UTF8.GetString(data);
results.Add(json);
}
else
{
results.Add(data);
}
}
}
catch (EndOfStreamException)
{
// Expected when stream ends
}
// Assert
results.Should().HaveCountGreaterThan(0);
results.Should().Contain(r => r is string);
results.Should().Contain(r => r is byte[]);
}
[Fact]
public async Task StreamingResponse_WithErrors_HandlesGracefully()
{
// Act
var response = await _client.GetStreamAsync("/api/stream/with-errors?count=20&errorAt=10");
var items = new List<TestItem>();
var errorOccurred = false;
using var reader = new StreamReader(response);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
if (line.Contains("\"error\":"))
{
errorOccurred = true;
break;
}
if (!string.IsNullOrWhiteSpace(line) && line.StartsWith("{"))
{
try
{
var item = JsonSerializer.Deserialize<TestItem>(line);
if (item != null)
items.Add(item);
}
catch
{
// Ignore deserialization errors
}
}
}
// Assert
items.Should().HaveCount(10); // Should have items before error
errorOccurred.Should().BeTrue();
}
[Fact]
public async Task StreamingMetrics_TracksPerformance()
{
// Act
var response = await _client.GetAsync("/api/stream/items?count=100");
await response.Content.ReadAsStringAsync();
// Assert
response.Headers.Should().ContainKey("X-Stream-Duration-Ms");
response.Headers.Should().ContainKey("X-Stream-Chunks");
response.Headers.Should().ContainKey("X-Stream-Bytes");
var duration = int.Parse(response.Headers.GetValues("X-Stream-Duration-Ms").First());
var chunks = int.Parse(response.Headers.GetValues("X-Stream-Chunks").First());
var bytes = long.Parse(response.Headers.GetValues("X-Stream-Bytes").First());
duration.Should().BeGreaterThan(0);
chunks.Should().BeGreaterThan(0);
bytes.Should().BeGreaterThan(0);
}
private class TestItem
{
public int Id { get; set; }
public string Name { get; set; } = "";
public DateTime Created { get; set; }
}
}
// Test controllers for streaming
[ApiController]
[Route("api/stream")]
public class StreamTestController : ControllerBase
{
[HttpGet("items")]
public async IAsyncEnumerable<TestItem> GetItems([FromQuery] int count = 100)
{
var start = DateTime.UtcNow;
for (int i = 1; i <= count; i++)
{
yield return new TestItem
{
Id = i,
Name = $"Item {i}",
Created = DateTime.UtcNow
};
if (i % 10 == 0)
await Task.Delay(1); // Simulate work
}
// Add metrics to response headers
Response.Headers.Add("X-Stream-Duration-Ms", ((int)(DateTime.UtcNow - start).TotalMilliseconds).ToString());
Response.Headers.Add("X-Stream-Chunks", (count / 10).ToString());
Response.Headers.Add("X-Stream-Bytes", (count * 50).ToString()); // Approximate
}
[HttpGet("sqrt-chunked")]
[SpaceTimeStreaming(ChunkStrategy = ChunkStrategy.SqrtN)]
public async IAsyncEnumerable<List<TestItem>> GetSqrtChunked([FromQuery] int count = 100)
{
var chunkSize = (int)Math.Sqrt(count);
var items = new List<TestItem>();
for (int i = 1; i <= count; i++)
{
items.Add(new TestItem
{
Id = i,
Name = $"Item {i}",
Created = DateTime.UtcNow
});
if (items.Count >= chunkSize || i == count)
{
yield return new List<TestItem>(items);
items.Clear();
await Task.Delay(10);
}
}
}
[HttpGet("large")]
[SpaceTimeStreaming]
public async IAsyncEnumerable<TestItem> GetLargeDataset([FromQuery] int count = 10000)
{
for (int i = 1; i <= count; i++)
{
yield return new TestItem
{
Id = i,
Name = $"Item {i} with some additional data to make it larger",
Created = DateTime.UtcNow
};
if (i % 100 == 0)
await Task.Yield(); // Allow other work
}
}
[HttpGet("backpressure")]
public async IAsyncEnumerable<TestItem> GetWithBackpressure([FromQuery] int count = 50)
{
for (int i = 1; i <= count; i++)
{
yield return new TestItem
{
Id = i,
Name = $"Item {i}",
Created = DateTime.UtcNow
};
// Simulate varying processing time
await Task.Delay(Random.Shared.Next(10, 50));
}
}
[HttpGet("cancellable")]
public async IAsyncEnumerable<TestItem> GetCancellable(
[FromQuery] int count = 1000,
[System.Runtime.CompilerServices.EnumeratorCancellation] System.Threading.CancellationToken cancellationToken = default)
{
for (int i = 1; i <= count; i++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new TestItem
{
Id = i,
Name = $"Item {i}",
Created = DateTime.UtcNow
};
await Task.Delay(10, cancellationToken);
}
}
[HttpGet("compressed")]
[SpaceTimeStreaming]
public async IAsyncEnumerable<TestItem> GetCompressed([FromQuery] int count = 100)
{
for (int i = 1; i <= count; i++)
{
yield return new TestItem
{
Id = i,
Name = $"Compressible item {i} with repeated text repeated text repeated text",
Created = DateTime.UtcNow
};
await Task.Yield();
}
}
[HttpGet("mixed")]
public async Task GetMixedContent()
{
Response.ContentType = "application/octet-stream";
using var writer = new BinaryWriter(Response.Body);
for (int i = 1; i <= 10; i++)
{
if (i % 2 == 0)
{
// Write JSON
var json = JsonSerializer.Serialize(new TestItem { Id = i, Name = $"Item {i}" });
var jsonBytes = Encoding.UTF8.GetBytes(json);
writer.Write((byte)0); // Type: JSON
writer.Write(jsonBytes.Length);
writer.Write(jsonBytes);
}
else
{
// Write binary data
var binaryData = new byte[100];
Random.Shared.NextBytes(binaryData);
writer.Write((byte)1); // Type: Binary
writer.Write(binaryData.Length);
writer.Write(binaryData);
}
writer.Flush();
await Response.Body.FlushAsync();
await Task.Delay(10);
}
}
[HttpGet("with-errors")]
public async IAsyncEnumerable<object> GetWithErrors([FromQuery] int count = 20, [FromQuery] int errorAt = 10)
{
for (int i = 1; i <= count; i++)
{
if (i == errorAt)
{
yield return new { error = "Simulated error", at = i };
yield break;
}
yield return new TestItem
{
Id = i,
Name = $"Item {i}",
Created = DateTime.UtcNow
};
await Task.Delay(10);
}
}
public class TestItem
{
public int Id { get; set; }
public string Name { get; set; } = "";
public DateTime Created { get; set; }
}
}