mirror of
https://github.com/Sonarr/Sonarr
synced 2025-12-06 08:28:37 +01:00
Use Happy Eyeballs for HTTP socket address selection
Fixes IPv6 getting disabled if an error happens with a single connection. Based on https://slugcat.systems/post/24-06-16-ipv6-is-hard-happy-eyeballs-dotnet-httpclient/
This commit is contained in:
parent
550cf8d399
commit
d3ada78fb9
4 changed files with 497 additions and 93 deletions
222
src/NzbDrone.Common.Test/Http/HappyEyeballsFixture.cs
Normal file
222
src/NzbDrone.Common.Test/Http/HappyEyeballsFixture.cs
Normal file
|
|
@ -0,0 +1,222 @@
|
|||
using System;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Moq;
|
||||
using NUnit.Framework;
|
||||
using NzbDrone.Common.Http.HappyEyeballs;
|
||||
using NzbDrone.Test.Common;
|
||||
|
||||
namespace NzbDrone.Common.Test.Http
|
||||
{
|
||||
[TestFixture]
|
||||
public class HappyEyeballsFixture : TestBase
|
||||
{
|
||||
private static IPAddress ipv6Address1 = IPAddress.Parse("2001:db8::1");
|
||||
private static IPAddress ipv6Address2 = IPAddress.Parse("2001:db8::2");
|
||||
private static IPAddress ipv4Address1 = IPAddress.Parse("192.0.2.1");
|
||||
private static IPAddress ipv4Address2 = IPAddress.Parse("192.0.2.2");
|
||||
|
||||
private Mock<Func<IPAddress, CancellationToken, Task<IDisposable>>> _connectSocketMock;
|
||||
private Mock<Func<CancellationToken, Task>> _taskDelayMock;
|
||||
private HappyEyeballs<IDisposable> _happyEyeballs;
|
||||
|
||||
[SetUp]
|
||||
public void SetUp()
|
||||
{
|
||||
_connectSocketMock = new Mock<Func<IPAddress, CancellationToken, Task<IDisposable>>>(MockBehavior.Strict);
|
||||
_taskDelayMock = new Mock<Func<CancellationToken, Task>>(MockBehavior.Strict);
|
||||
_happyEyeballs = new HappyEyeballs<IDisposable>(_connectSocketMock.Object, _taskDelayMock.Object);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void should_throw_exception_when_no_ips_resolved()
|
||||
{
|
||||
var addresses = Array.Empty<IPAddress>();
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () =>
|
||||
await _happyEyeballs.Connect(addresses, cancellationToken));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task should_connect_successfully_when_valid_addresses_are_provided()
|
||||
{
|
||||
var addresses = new[]
|
||||
{
|
||||
ipv4Address1,
|
||||
ipv6Address1,
|
||||
};
|
||||
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
var sequence = new MockSequence();
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
|
||||
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(TaskFromCancellationToken);
|
||||
|
||||
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
|
||||
|
||||
Assert.NotNull(result);
|
||||
|
||||
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Once);
|
||||
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void should_throw_aggregateexception_when_no_addresses_successfully_connect()
|
||||
{
|
||||
var addresses = new[]
|
||||
{
|
||||
ipv4Address1,
|
||||
ipv4Address2,
|
||||
ipv6Address1,
|
||||
ipv6Address2,
|
||||
};
|
||||
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
var sequence = new MockSequence();
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
|
||||
.ThrowsAsync(new Exception());
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(TaskFromCancellationToken);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
|
||||
.ThrowsAsync(new Exception());
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(TaskFromCancellationToken);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address2, It.IsAny<CancellationToken>()))
|
||||
.ThrowsAsync(new Exception());
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(TaskFromCancellationToken);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv4Address2, It.IsAny<CancellationToken>()))
|
||||
.ThrowsAsync(new Exception());
|
||||
|
||||
var ex = Assert.ThrowsAsync<AggregateException>(async () =>
|
||||
await _happyEyeballs.Connect(addresses, cancellationToken));
|
||||
|
||||
Assert.That(ex.InnerExceptions, Has.Count.EqualTo(4));
|
||||
|
||||
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(4));
|
||||
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Exactly(3));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void should_throw_operationcanceledexception_when_canceled()
|
||||
{
|
||||
var addresses = new[]
|
||||
{
|
||||
ipv6Address1,
|
||||
};
|
||||
|
||||
using var cancellationTokenSource = new CancellationTokenSource();
|
||||
var cancellationToken = cancellationTokenSource.Token;
|
||||
|
||||
_connectSocketMock
|
||||
.Setup(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()))
|
||||
.Callback((IPAddress _, CancellationToken _) => cancellationTokenSource.Cancel())
|
||||
.ThrowsAsync(new Exception());
|
||||
|
||||
var ex = Assert.ThrowsAsync<OperationCanceledException>(async () =>
|
||||
await _happyEyeballs.Connect(addresses, cancellationToken));
|
||||
|
||||
Assert.That(ex.CancellationToken.IsCancellationRequested, Is.True);
|
||||
|
||||
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task should_connect_to_ipv4_when_ipv6_connection_fails()
|
||||
{
|
||||
var addresses = new[]
|
||||
{
|
||||
ipv4Address1,
|
||||
ipv6Address1,
|
||||
};
|
||||
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
var sequence = new MockSequence();
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
|
||||
.ThrowsAsync(new Exception());
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(TaskFromCancellationToken);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
|
||||
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
|
||||
|
||||
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
|
||||
|
||||
Assert.NotNull(result);
|
||||
|
||||
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(2));
|
||||
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Once);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task should_dispose_multiple_successful_connections()
|
||||
{
|
||||
var addresses = new[]
|
||||
{
|
||||
ipv4Address1,
|
||||
ipv6Address1,
|
||||
ipv6Address2,
|
||||
};
|
||||
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
var disposableMock = new Mock<IDisposable>(MockBehavior.Strict);
|
||||
disposableMock.Setup(x => x.Dispose());
|
||||
|
||||
var sequence = new MockSequence();
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
|
||||
.Returns((IPAddress _, CancellationToken cancel) => ReturnAfterCancellation(cancel, disposableMock.Object));
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
|
||||
.Returns((IPAddress _, CancellationToken cancel) => ReturnAfterCancellation(cancel, disposableMock.Object));
|
||||
_taskDelayMock.InSequence(sequence)
|
||||
.Setup(x => x(It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
_connectSocketMock.InSequence(sequence)
|
||||
.Setup(x => x(ipv6Address2, It.IsAny<CancellationToken>()))
|
||||
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
|
||||
|
||||
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
|
||||
|
||||
Assert.NotNull(result);
|
||||
|
||||
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(3));
|
||||
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Exactly(2));
|
||||
disposableMock.Verify(x => x.Dispose(), Times.Exactly(2));
|
||||
}
|
||||
|
||||
private static async Task<IDisposable> ReturnAfterCancellation(CancellationToken cancellationToken, IDisposable socket)
|
||||
{
|
||||
await TaskFromCancellationToken(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
return socket;
|
||||
}
|
||||
|
||||
private static Task TaskFromCancellationToken(CancellationToken cancellationToken)
|
||||
{
|
||||
var tcs = new TaskCompletionSource();
|
||||
cancellationToken.Register(() => tcs.TrySetCanceled());
|
||||
return tcs.Task;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,9 +1,6 @@
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.NetworkInformation;
|
||||
using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
|
|
@ -12,6 +9,7 @@
|
|||
using NLog;
|
||||
using NzbDrone.Common.Cache;
|
||||
using NzbDrone.Common.Extensions;
|
||||
using NzbDrone.Common.Http.HappyEyeballs;
|
||||
using NzbDrone.Common.Http.Proxy;
|
||||
|
||||
namespace NzbDrone.Common.Http.Dispatchers
|
||||
|
|
@ -20,18 +18,13 @@ public class ManagedHttpDispatcher : IHttpDispatcher
|
|||
{
|
||||
private const string NO_PROXY_KEY = "no-proxy";
|
||||
|
||||
private const int connection_establish_timeout = 2000;
|
||||
private static bool useIPv6 = Socket.OSSupportsIPv6;
|
||||
private static bool hasResolvedIPv6Availability;
|
||||
|
||||
private readonly IHttpProxySettingsProvider _proxySettingsProvider;
|
||||
private readonly ICreateManagedWebProxy _createManagedWebProxy;
|
||||
private readonly ICertificateValidationService _certificateValidationService;
|
||||
private readonly IUserAgentBuilder _userAgentBuilder;
|
||||
private readonly ICached<System.Net.Http.HttpClient> _httpClientCache;
|
||||
private readonly ICached<CredentialCache> _credentialCache;
|
||||
|
||||
private readonly Logger _logger;
|
||||
private readonly HttpHappyEyeballs _httpHappyEyeballs;
|
||||
|
||||
public ManagedHttpDispatcher(IHttpProxySettingsProvider proxySettingsProvider,
|
||||
ICreateManagedWebProxy createManagedWebProxy,
|
||||
|
|
@ -48,7 +41,7 @@ public ManagedHttpDispatcher(IHttpProxySettingsProvider proxySettingsProvider,
|
|||
_httpClientCache = cacheManager.GetCache<System.Net.Http.HttpClient>(typeof(ManagedHttpDispatcher));
|
||||
_credentialCache = cacheManager.GetCache<CredentialCache>(typeof(ManagedHttpDispatcher), "credentialcache");
|
||||
|
||||
_logger = logger;
|
||||
_httpHappyEyeballs = new HttpHappyEyeballs(logger);
|
||||
}
|
||||
|
||||
public async Task<HttpResponse> GetResponseAsync(HttpRequest request, CookieContainer cookies)
|
||||
|
|
@ -164,7 +157,7 @@ protected virtual System.Net.Http.HttpClient CreateHttpClient(HttpProxySettings
|
|||
Credentials = GetCredentialCache(),
|
||||
PreAuthenticate = true,
|
||||
MaxConnectionsPerServer = 12,
|
||||
ConnectCallback = onConnect,
|
||||
ConnectCallback = Socket.OSSupportsIPv6 ? _httpHappyEyeballs.OnConnect : null,
|
||||
SslOptions = new SslClientAuthenticationOptions
|
||||
{
|
||||
RemoteCertificateValidationCallback = _certificateValidationService.ShouldByPassValidationError
|
||||
|
|
@ -254,87 +247,5 @@ private CredentialCache GetCredentialCache()
|
|||
{
|
||||
return _credentialCache.Get("credentialCache", () => new CredentialCache());
|
||||
}
|
||||
|
||||
private bool HasRoutableIPv4Address()
|
||||
{
|
||||
// Get all IPv4 addresses from all interfaces and return true if there are any with non-loopback addresses
|
||||
try
|
||||
{
|
||||
var networkInterfaces = NetworkInterface.GetAllNetworkInterfaces();
|
||||
|
||||
return networkInterfaces.Any(ni =>
|
||||
ni.OperationalStatus == OperationalStatus.Up &&
|
||||
ni.GetIPProperties().UnicastAddresses.Any(ip =>
|
||||
ip.Address.AddressFamily == AddressFamily.InterNetwork &&
|
||||
!IPAddress.IsLoopback(ip.Address)));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.Debug(e, "Caught exception while GetAllNetworkInterfaces assuming IPv4 connectivity: {0}", e.Message);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask<Stream> onConnect(SocketsHttpConnectionContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
// Until .NET supports an implementation of Happy Eyeballs (https://tools.ietf.org/html/rfc8305#section-2), let's make IPv4 fallback work in a simple way.
|
||||
// This issue is being tracked at https://github.com/dotnet/runtime/issues/26177 and expected to be fixed in .NET 6.
|
||||
if (useIPv6)
|
||||
{
|
||||
try
|
||||
{
|
||||
var localToken = cancellationToken;
|
||||
|
||||
if (!hasResolvedIPv6Availability)
|
||||
{
|
||||
// to make things move fast, use a very low timeout for the initial ipv6 attempt.
|
||||
var quickFailCts = new CancellationTokenSource(connection_establish_timeout);
|
||||
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, quickFailCts.Token);
|
||||
|
||||
localToken = linkedTokenSource.Token;
|
||||
}
|
||||
|
||||
return await attemptConnection(AddressFamily.InterNetworkV6, context, localToken);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Do not retry IPv6 if a routable IPv4 address is available, otherwise continue to attempt IPv6 connections.
|
||||
var routableIPv4 = HasRoutableIPv4Address();
|
||||
_logger.Info("IPv4 is available: {0}, IPv6 will be {1}", routableIPv4, routableIPv4 ? "disabled" : "left enabled");
|
||||
useIPv6 = !routableIPv4;
|
||||
}
|
||||
finally
|
||||
{
|
||||
hasResolvedIPv6Availability = true;
|
||||
}
|
||||
}
|
||||
|
||||
// fallback to IPv4.
|
||||
return await attemptConnection(AddressFamily.InterNetwork, context, cancellationToken);
|
||||
}
|
||||
|
||||
private static async ValueTask<Stream> attemptConnection(AddressFamily addressFamily, SocketsHttpConnectionContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
// The following socket constructor will create a dual-mode socket on systems where IPV6 is available.
|
||||
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
|
||||
{
|
||||
// Turn off Nagle's algorithm since it degrades performance in most HttpClient scenarios.
|
||||
NoDelay = true
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await socket.ConnectAsync(context.DnsEndPoint, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// The stream should take the ownership of the underlying socket,
|
||||
// closing it when it's disposed.
|
||||
return new NetworkStream(socket, ownsSocket: true);
|
||||
}
|
||||
catch
|
||||
{
|
||||
socket.Dispose();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
182
src/NzbDrone.Common/Http/HappyEyeballs/HappyEyeballs.cs
Normal file
182
src/NzbDrone.Common/Http/HappyEyeballs/HappyEyeballs.cs
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
Until .NET implements Happy Eyeballs natively, use third-party implementation from
|
||||
https://slugcat.systems/post/24-06-16-ipv6-is-hard-happy-eyeballs-dotnet-httpclient/#the-implementation
|
||||
This issue is being tracked at https://github.com/dotnet/runtime/issues/26177.
|
||||
|
||||
Below is a slightly modified Happy Eyeballs implementation from the post above.
|
||||
We’ve factored out HTTP-specific implementation into HttpHappyEyeballs class to
|
||||
make testing easier.
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NzbDrone.Common.Http.HappyEyeballs;
|
||||
|
||||
public class HappyEyeballs<TSocket>
|
||||
where TSocket : IDisposable
|
||||
{
|
||||
private readonly Func<IPAddress, CancellationToken, Task<TSocket>> _connectSocket;
|
||||
private readonly Func<CancellationToken, Task> _taskDelay;
|
||||
|
||||
public HappyEyeballs(
|
||||
Func<IPAddress, CancellationToken, Task<TSocket>> connectSocket,
|
||||
Func<CancellationToken, Task> taskDelay)
|
||||
{
|
||||
_connectSocket = connectSocket;
|
||||
_taskDelay = taskDelay;
|
||||
}
|
||||
|
||||
public async ValueTask<TSocket> Connect(
|
||||
IPAddress[] addresses,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(addresses.Length);
|
||||
|
||||
var ips = SortInterleaved(addresses);
|
||||
return await ParallelTask(
|
||||
ips.Length,
|
||||
(i, cancel) => _connectSocket(ips[i], cancel),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private IPAddress[] SortInterleaved(IPAddress[] addresses)
|
||||
{
|
||||
// Interleave returned addresses so that they are IPv6 -> IPv4 -> IPv6 -> IPv4.
|
||||
// Assuming we have multiple addresses of the same type that is.
|
||||
// As described in the RFC.
|
||||
var ipv6 = addresses.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
|
||||
var ipv4 = addresses.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
|
||||
|
||||
var commonLength = Math.Min(ipv6.Length, ipv4.Length);
|
||||
|
||||
var result = new IPAddress[addresses.Length];
|
||||
for (var i = 0; i < commonLength; i++)
|
||||
{
|
||||
result[i * 2] = ipv6[i];
|
||||
result[1 + (i * 2)] = ipv4[i];
|
||||
}
|
||||
|
||||
if (ipv4.Length > ipv6.Length)
|
||||
{
|
||||
ipv4.AsSpan(commonLength).CopyTo(result.AsSpan(commonLength * 2));
|
||||
}
|
||||
else if (ipv6.Length > ipv4.Length)
|
||||
{
|
||||
ipv6.AsSpan(commonLength).CopyTo(result.AsSpan(commonLength * 2));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task<TSocket> ParallelTask(
|
||||
int totalTasks,
|
||||
Func<int, CancellationToken, Task<TSocket>> taskBuilder,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(totalTasks);
|
||||
|
||||
using var successCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
var taskIndex = 0;
|
||||
var tasks = new List<Task<TSocket>>();
|
||||
var innerExceptions = new List<Exception>();
|
||||
|
||||
// The general loop here is as follows:
|
||||
// 1. Add a new task for the next IP to try.
|
||||
// 2. Wait until any task completes OR the delay happens.
|
||||
// If an error occurs, we stop checking that task and continue checking the next.
|
||||
// Every iteration we add another task, until we're full on them.
|
||||
// We keep looping until we have SUCCESS, or we run out of attempt tasks entirely.
|
||||
Task<TSocket> successTask = null;
|
||||
while (taskIndex < totalTasks || tasks.Count > 0)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (taskIndex < totalTasks)
|
||||
{
|
||||
// We have to queue another task this iteration.
|
||||
var newTask = taskBuilder(taskIndex, successCts.Token);
|
||||
tasks.Add(newTask);
|
||||
taskIndex++;
|
||||
}
|
||||
|
||||
var whenAnyDone = Task.WhenAny(tasks);
|
||||
Task<TSocket> completedTask;
|
||||
|
||||
if (taskIndex < totalTasks)
|
||||
{
|
||||
using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(successCts.Token);
|
||||
|
||||
// If we have another one to queue, wait for a timeout instead of *just* waiting for a connection task.
|
||||
var timeoutTask = _taskDelay(delayCts.Token);
|
||||
var whenAnyOrTimeout = await Task.WhenAny(whenAnyDone, timeoutTask).ConfigureAwait(false);
|
||||
if (whenAnyOrTimeout != whenAnyDone)
|
||||
{
|
||||
// Timeout finished. Go to next iteration so we queue another one.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ensure that we dispose the internal timer associated with Task.Delay.
|
||||
await delayCts.CancelAsync().ConfigureAwait(false);
|
||||
await timeoutTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
completedTask = whenAnyDone.Result;
|
||||
}
|
||||
else
|
||||
{
|
||||
completedTask = await whenAnyDone.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
tasks.Remove(completedTask);
|
||||
|
||||
if (completedTask.IsCompletedSuccessfully)
|
||||
{
|
||||
// We did it. We have success.
|
||||
successTask = completedTask;
|
||||
break;
|
||||
}
|
||||
else if (completedTask.IsFaulted)
|
||||
{
|
||||
innerExceptions.AddRange(completedTask.Exception!.InnerExceptions);
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel and wait for all pending tasks.
|
||||
await successCts.CancelAsync().ConfigureAwait(false);
|
||||
await Task.WhenAll(tasks.Cast<Task>()).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
// Make sure that we don't get multiple sockets completing at once.
|
||||
//
|
||||
// Also for cancellation, e.g. if delay task completes before
|
||||
// socket connection in the task loop, and we receive cancellation
|
||||
// at the same time (thus stopping the loop). In this case, prefer
|
||||
// throwing an exception instead of returning a successful task.
|
||||
foreach (var task in tasks)
|
||||
{
|
||||
if (task.IsCompletedSuccessfully)
|
||||
{
|
||||
task.Result.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
if (successTask == null)
|
||||
{
|
||||
// We didn't get a single successful connection. Well heck.
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
Debug.Assert(innerExceptions.Count > 0);
|
||||
throw new AggregateException(innerExceptions);
|
||||
}
|
||||
|
||||
return successTask.Result;
|
||||
}
|
||||
}
|
||||
89
src/NzbDrone.Common/Http/HappyEyeballs/HttpHappyEyeballs.cs
Normal file
89
src/NzbDrone.Common/Http/HappyEyeballs/HttpHappyEyeballs.cs
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using NLog;
|
||||
|
||||
namespace NzbDrone.Common.Http.HappyEyeballs;
|
||||
|
||||
public class HttpHappyEyeballs
|
||||
{
|
||||
private const int ConnectionAttemptDelay = 250;
|
||||
|
||||
private readonly Logger _logger;
|
||||
|
||||
public HttpHappyEyeballs(Logger logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async ValueTask<Stream> OnConnect(
|
||||
SocketsHttpConnectionContext context,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var endPoint = context.DnsEndPoint;
|
||||
|
||||
var ipHostEntry = await Dns.GetHostEntryAsync(endPoint.Host, endPoint.AddressFamily, cancellationToken).ConfigureAwait(false);
|
||||
var resolvedAddresses = ipHostEntry.AddressList;
|
||||
if (resolvedAddresses.Length == 0)
|
||||
{
|
||||
throw new WebException(
|
||||
$"The remote name {endPoint.Host} could not be resolved",
|
||||
WebExceptionStatus.NameResolutionFailure);
|
||||
}
|
||||
|
||||
var happyEyeballs = CreateHappyEyeballs(endPoint);
|
||||
var socket = await happyEyeballs.Connect(resolvedAddresses, cancellationToken).ConfigureAwait(false);
|
||||
_logger.Debug("Successfully connected {DnsEndPoint} to address: {RemoteEndPoint}", endPoint, socket.RemoteEndPoint);
|
||||
return new NetworkStream(socket, ownsSocket: true);
|
||||
}
|
||||
|
||||
private HappyEyeballs<Socket> CreateHappyEyeballs(DnsEndPoint endPoint)
|
||||
{
|
||||
return new HappyEyeballs<Socket>(
|
||||
(ipAddress, cancel) => ConnectSocket(ipAddress, endPoint, cancel),
|
||||
cancel => TaskDelay(endPoint, cancel));
|
||||
}
|
||||
|
||||
private async Task TaskDelay(DnsEndPoint endPoint, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var timeSpan = TimeSpan.FromMilliseconds(ConnectionAttemptDelay);
|
||||
_logger.Debug("Waiting on {DnsEndPoint} connection attempt delay for {TimeSpan}", endPoint, timeSpan);
|
||||
await Task.Delay(timeSpan, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<Socket> ConnectSocket(
|
||||
IPAddress ipAddress,
|
||||
DnsEndPoint endPoint,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var remoteEP = new IPEndPoint(ipAddress, endPoint.Port);
|
||||
|
||||
// The following socket constructor will create a dual-mode socket on
|
||||
// systems where IPv6 is available.
|
||||
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
|
||||
{
|
||||
// Turn off Nagle's algorithm since it degrades performance in most
|
||||
// HttpClient scenarios.
|
||||
NoDelay = true
|
||||
};
|
||||
|
||||
_logger.Debug("Trying Happy Eyeballs connection to {IPEndPoint} for host {DnsEndPoint}", ipAddress, endPoint);
|
||||
try
|
||||
{
|
||||
await socket.ConnectAsync(remoteEP, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
socket.Dispose();
|
||||
_logger.Debug(e, "Happy Eyeballs connection to {IPEndPoint} for host {DnsEndPoint} failed", ipAddress, endPoint);
|
||||
throw;
|
||||
}
|
||||
|
||||
return socket;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue