using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SqrtSpace.SpaceTime.Core;
namespace SqrtSpace.SpaceTime.Pipeline;
///
/// Memory-efficient data pipeline with √n buffering
///
public class SpaceTimePipeline : ISpaceTimePipeline
{
private readonly List _stages;
private readonly ILogger> _logger;
private readonly PipelineConfiguration _configuration;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly SemaphoreSlim _executionLock;
private PipelineState _state;
public string Name { get; }
public PipelineState State => _state;
public SpaceTimePipeline(
string name,
ILogger> logger,
PipelineConfiguration? configuration = null)
{
Name = name ?? throw new ArgumentNullException(nameof(name));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = configuration ?? new PipelineConfiguration();
_stages = new List();
_cancellationTokenSource = new CancellationTokenSource();
_executionLock = new SemaphoreSlim(1, 1);
_state = PipelineState.Created;
}
public ISpaceTimePipeline AddStage(
string stageName,
Func> transform,
StageConfiguration? configuration = null)
{
if (_state != PipelineState.Created)
throw new InvalidOperationException("Cannot add stages after pipeline has started");
var stage = new TransformStage(
stageName,
transform,
configuration ?? new StageConfiguration(),
_logger);
_stages.Add(stage);
return this;
}
public ISpaceTimePipeline AddBatchStage(
string stageName,
Func, CancellationToken, Task>> batchTransform,
StageConfiguration? configuration = null)
{
if (_state != PipelineState.Created)
throw new InvalidOperationException("Cannot add stages after pipeline has started");
var stage = new BatchTransformStage(
stageName,
batchTransform,
configuration ?? new StageConfiguration(),
_logger);
_stages.Add(stage);
return this;
}
public ISpaceTimePipeline AddFilterStage(
string stageName,
Func predicate,
StageConfiguration? configuration = null)
{
if (_state != PipelineState.Created)
throw new InvalidOperationException("Cannot add stages after pipeline has started");
var stage = new FilterStage(
stageName,
predicate,
configuration ?? new StageConfiguration(),
_logger);
_stages.Add(stage);
return this;
}
public ISpaceTimePipeline AddCheckpointStage(
string stageName,
ICheckpointManager checkpointManager,
StageConfiguration? configuration = null)
{
if (_state != PipelineState.Created)
throw new InvalidOperationException("Cannot add stages after pipeline has started");
var stage = new CheckpointStage(
stageName,
checkpointManager,
configuration ?? new StageConfiguration(),
_logger);
_stages.Add(stage);
return this;
}
public async Task> ExecuteAsync(
TInput input,
CancellationToken cancellationToken = default)
{
return await ExecuteAsync(new[] { input }, cancellationToken);
}
public async Task> ExecuteAsync(
IEnumerable inputs,
CancellationToken cancellationToken = default)
{
await _executionLock.WaitAsync(cancellationToken);
try
{
_state = PipelineState.Running;
var startTime = DateTime.UtcNow;
var result = new PipelineResult();
// Link cancellation tokens
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken,
_cancellationTokenSource.Token);
// Create execution context
var context = new PipelineExecutionContext
{
PipelineName = Name,
ExecutionId = Guid.NewGuid().ToString(),
StartTime = startTime,
Configuration = _configuration,
CancellationToken = linkedCts.Token
};
try
{
// Build stage channels
var channels = BuildStageChannels();
// Start stage processors
var stageTasks = StartStageProcessors(channels, context);
// Feed inputs
await FeedInputsAsync(inputs, channels.First().Writer, context);
// Wait for completion
await Task.WhenAll(stageTasks);
// Collect outputs
var outputs = new List();
var outputChannel = channels.Last().Reader;
await foreach (var output in outputChannel.ReadAllAsync(linkedCts.Token))
{
outputs.Add((TOutput)(object)output);
}
result.Outputs = outputs;
result.Success = true;
result.ProcessedCount = outputs.Count;
}
catch (Exception ex)
{
_logger.LogError(ex, "Pipeline execution failed");
result.Success = false;
result.Error = ex;
_state = PipelineState.Failed;
}
result.Duration = DateTime.UtcNow - startTime;
_state = result.Success ? PipelineState.Completed : PipelineState.Failed;
return result;
}
finally
{
_executionLock.Release();
}
}
public async IAsyncEnumerable ExecuteStreamingAsync(
IAsyncEnumerable inputs,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await _executionLock.WaitAsync(cancellationToken);
try
{
_state = PipelineState.Running;
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken,
_cancellationTokenSource.Token);
var context = new PipelineExecutionContext
{
PipelineName = Name,
ExecutionId = Guid.NewGuid().ToString(),
StartTime = DateTime.UtcNow,
Configuration = _configuration,
CancellationToken = linkedCts.Token
};
// Build channels
var channels = BuildStageChannels();
// Start processors
var stageTasks = StartStageProcessors(channels, context);
// Start input feeder
var feederTask = Task.Run(async () =>
{
try
{
await foreach (var input in inputs.WithCancellation(linkedCts.Token))
{
await channels.First().Writer.WriteAsync(input, linkedCts.Token);
}
}
finally
{
channels.First().Writer.Complete();
}
}, linkedCts.Token);
// Stream outputs
var outputChannel = channels.Last().Reader;
await foreach (var output in outputChannel.ReadAllAsync(linkedCts.Token))
{
yield return (TOutput)(object)output;
}
await Task.WhenAll(stageTasks.Concat(new[] { feederTask }));
_state = PipelineState.Completed;
}
finally
{
_executionLock.Release();
}
}
public async Task GetStatisticsAsync()
{
var stats = new PipelineStatistics
{
PipelineName = Name,
State = _state,
StageCount = _stages.Count,
StageStatistics = new List()
};
foreach (var stage in _stages)
{
stats.StageStatistics.Add(await stage.GetStatisticsAsync());
}
stats.TotalItemsProcessed = stats.StageStatistics.Sum(s => s.ItemsProcessed);
stats.TotalErrors = stats.StageStatistics.Sum(s => s.Errors);
stats.AverageLatency = stats.StageStatistics.Any()
? TimeSpan.FromMilliseconds(stats.StageStatistics.Average(s => s.AverageLatency.TotalMilliseconds))
: TimeSpan.Zero;
return stats;
}
private List> BuildStageChannels()
{
var channels = new List>();
for (int i = 0; i <= _stages.Count; i++)
{
var bufferSize = i < _stages.Count
? _stages[i].Configuration.BufferSize
: _configuration.OutputBufferSize;
// Use √n buffering if not specified
if (bufferSize == 0)
{
bufferSize = SpaceTimeCalculator.CalculateSqrtInterval(_configuration.ExpectedItemCount);
}
var channel = Channel.CreateBounded