From a0e77356a8ae74f98d4da8e70edbe677e21eec8e Mon Sep 17 00:00:00 2001 From: martintmk <103487740+martintmk@users.noreply.github.com> Date: Fri, 8 Sep 2023 16:58:33 +0200 Subject: [PATCH] Delay pipeline disposal when still in use (#1579) --- .../RegistryPipelineComponentBuilder.cs | 9 +- .../Pipeline/ExecutionTrackingComponent.cs | 59 ++++++ .../Pipeline/PipelineComponentFactory.cs | 2 + .../ResiliencePipelineExtensions.cs | 18 +- .../Controller/ScheduledTaskExecutorTests.cs | 4 +- .../ResiliencePipelineRegistryTests.cs | 15 ++ .../ExecutionTrackingComponentTests.cs | 168 ++++++++++++++++++ 7 files changed, 260 insertions(+), 15 deletions(-) create mode 100644 src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs create mode 100644 test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs diff --git a/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs b/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs index e29c89560d..4493631f8c 100644 --- a/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs +++ b/src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs @@ -57,14 +57,17 @@ private Builder CreateBuilder() builder.InstanceName = _instanceName; _configure(builder, context); + var timeProvider = builder.TimeProvider; var telemetry = new ResilienceStrategyTelemetry( new ResilienceTelemetrySource(builder.Name, builder.InstanceName, null), builder.TelemetryListener); return new( - () => PipelineComponentFactory.WithDisposableCallbacks( - builder.BuildPipelineComponent(), - context.DisposeCallbacks), + () => + { + var innerComponent = PipelineComponentFactory.WithDisposableCallbacks(builder.BuildPipelineComponent(), context.DisposeCallbacks); + return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider); + }, context.ReloadTokens, telemetry); } diff --git a/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs new file mode 100644 index 0000000000..5a03e7f94f --- /dev/null +++ b/src/Polly.Core/Utils/Pipeline/ExecutionTrackingComponent.cs @@ -0,0 +1,59 @@ +namespace Polly.Utils.Pipeline; + +internal sealed class ExecutionTrackingComponent : PipelineComponent +{ + public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(30); + + public static readonly TimeSpan SleepDelay = TimeSpan.FromSeconds(1); + + private readonly TimeProvider _timeProvider; + private int _pendingExecutions; + + public ExecutionTrackingComponent(PipelineComponent component, TimeProvider timeProvider) + { + Component = component; + _timeProvider = timeProvider; + } + + public PipelineComponent Component { get; } + + public bool HasPendingExecutions => Interlocked.CompareExchange(ref _pendingExecutions, 0, 0) > 0; + + internal override async ValueTask> ExecuteCore( + Func>> callback, + ResilienceContext context, + TState state) + { + Interlocked.Increment(ref _pendingExecutions); + + try + { + return await Component.ExecuteCore(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext); + } + finally + { + Interlocked.Decrement(ref _pendingExecutions); + } + } + + public override async ValueTask DisposeAsync() + { + var start = _timeProvider.GetTimestamp(); + var stopwatch = Stopwatch.StartNew(); + + // We don't want to introduce locks or any synchronization primitives to main execution path + // so we will do "dummy" retries until there are no more executions. + while (HasPendingExecutions) + { + await _timeProvider.Delay(SleepDelay).ConfigureAwait(false); + + // stryker disable once equality : no means to test this + if (_timeProvider.GetElapsedTime(start) > Timeout) + { + break; + } + } + + await Component.DisposeAsync().ConfigureAwait(false); + } +} diff --git a/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs b/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs index 125f742a10..5c992295fc 100644 --- a/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs +++ b/src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs @@ -24,6 +24,8 @@ public static PipelineComponent WithDisposableCallbacks(PipelineComponent compon return new ComponentWithDisposeCallbacks(component, callbacks.ToList()); } + public static PipelineComponent WithExecutionTracking(PipelineComponent component, TimeProvider timeProvider) => new ExecutionTrackingComponent(component, timeProvider); + public static PipelineComponent CreateComposite( IReadOnlyList components, ResilienceStrategyTelemetry telemetry, diff --git a/src/Polly.Testing/ResiliencePipelineExtensions.cs b/src/Polly.Testing/ResiliencePipelineExtensions.cs index 19692182d3..feea4ad59d 100644 --- a/src/Polly.Testing/ResiliencePipelineExtensions.cs +++ b/src/Polly.Testing/ResiliencePipelineExtensions.cs @@ -40,10 +40,10 @@ private static ResiliencePipelineDescriptor GetPipelineDescriptorCore(Pipelin var components = new List(); component.ExpandComponents(components); - var descriptors = components.Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance(s))).ToList(); + var descriptors = components.OfType().Select(s => new ResilienceStrategyDescriptor(s.Options, GetStrategyInstance(s))).ToList().AsReadOnly(); return new ResiliencePipelineDescriptor( - descriptors.Where(s => !ShouldSkip(s.StrategyInstance)).ToList().AsReadOnly(), + descriptors, isReloadable: components.Exists(s => s is ReloadableComponent)); } @@ -54,16 +54,9 @@ private static object GetStrategyInstance(PipelineComponent component) return reactiveBridge.Strategy; } - if (component is BridgeComponent nonReactiveBridge) - { - return nonReactiveBridge.Strategy; - } - - return component; + return ((BridgeComponent)component).Strategy; } - private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks; - private static void ExpandComponents(this PipelineComponent component, List components) { if (component is CompositeComponent pipeline) @@ -78,6 +71,11 @@ private static void ExpandComponents(this PipelineComponent component, List builder.AddTimeout(TimeSpan.FromSeconds(1))); + pipeline.Component.Should().BeOfType().Subject.Component.Should().BeOfType(); + + var genericPipeline = registry.GetOrAddPipeline(id, builder => builder.AddTimeout(TimeSpan.FromSeconds(1))); + pipeline.Component.Should().BeOfType().Subject.Component.Should().BeOfType(); + } + [Fact] public void GetOrAddPipeline_Generic_Ok() { diff --git a/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs new file mode 100644 index 0000000000..2cebbffc5e --- /dev/null +++ b/test/Polly.Core.Tests/Utils/Pipeline/ExecutionTrackingComponentTests.cs @@ -0,0 +1,168 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Time.Testing; +using Polly.Utils.Pipeline; + +namespace Polly.Core.Tests.Utils.Pipeline; + +public class ExecutionTrackingComponentTests +{ + private readonly FakeTimeProvider _timeProvider = new(); + + [Fact] + public async Task DisposeAsync_PendingOperations_Delayed() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + var disposeTask = component.DisposeAsync().AsTask(); + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + inner.Disposed.Should().BeFalse(); + assert.Set(); + + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + await execution; + + _timeProvider.Advance(ExecutionTrackingComponent.SleepDelay); + await disposeTask; + + inner.Disposed.Should().BeTrue(); + } + + [Fact] + public async Task HasPendingExecutions_Ok() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + await using var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + component.HasPendingExecutions.Should().BeTrue(); + assert.Set(); + await execution; + + component.HasPendingExecutions.Should().BeFalse(); + } + + [Fact] + public async Task DisposeAsync_Timeout_Ok() + { + using var assert = new ManualResetEvent(false); + using var executing = new ManualResetEvent(false); + + await using var inner = new Inner + { + OnExecute = () => + { + executing.Set(); + assert.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, _timeProvider); + var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { })); + executing.WaitOne(); + + var disposeTask = component.DisposeAsync().AsTask(); + inner.Disposed.Should().BeFalse(); + _timeProvider.Advance(ExecutionTrackingComponent.Timeout - TimeSpan.FromSeconds(1)); + inner.Disposed.Should().BeFalse(); + _timeProvider.Advance(TimeSpan.FromSeconds(1)); + _timeProvider.Advance(TimeSpan.FromSeconds(1)); + await disposeTask; + inner.Disposed.Should().BeTrue(); + + assert.Set(); + await execution; + } + + [Fact] + public async Task DisposeAsync_WhenRunningMultipleTasks_Ok() + { + var tasks = new ConcurrentQueue(); + await using var inner = new Inner + { + OnExecute = () => + { + var ev = new ManualResetEvent(false); + tasks.Enqueue(ev); + ev.WaitOne(); + } + }; + + var component = new ExecutionTrackingComponent(inner, TimeProvider.System); + var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow); + + for (int i = 0; i < 10; i++) + { + _ = Task.Run(() => pipeline.Execute(() => { })); + } + + while (tasks.Count != 10) + { + await Task.Delay(1); + } + + var disposeTask = component.DisposeAsync().AsTask(); + + while (tasks.Count > 1) + { + tasks.TryDequeue(out var ev).Should().BeTrue(); + ev!.Set(); + ev.Dispose(); + disposeTask.Wait(1).Should().BeFalse(); + inner.Disposed.Should().BeFalse(); + } + + // last one + tasks.TryDequeue(out var last).Should().BeTrue(); + last!.Set(); + last.Dispose(); + await disposeTask; + inner.Disposed.Should().BeTrue(); + } + + private class Inner : PipelineComponent + { + public bool Disposed { get; private set; } + + public override ValueTask DisposeAsync() + { + Disposed = true; + return default; + } + + public Action OnExecute { get; set; } = () => { }; + + internal override async ValueTask> ExecuteCore(Func>> callback, ResilienceContext context, TState state) + { + OnExecute(); + + return await callback(context, state); + } + } +}