This commit is contained in:
Ivan Trubach 2025-11-29 07:37:05 +02:00 committed by GitHub
commit 25997f2a0b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 497 additions and 93 deletions

View file

@ -0,0 +1,222 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NUnit.Framework;
using NzbDrone.Common.Http.HappyEyeballs;
using NzbDrone.Test.Common;
namespace NzbDrone.Common.Test.Http
{
[TestFixture]
public class HappyEyeballsFixture : TestBase
{
private static IPAddress ipv6Address1 = IPAddress.Parse("2001:db8::1");
private static IPAddress ipv6Address2 = IPAddress.Parse("2001:db8::2");
private static IPAddress ipv4Address1 = IPAddress.Parse("192.0.2.1");
private static IPAddress ipv4Address2 = IPAddress.Parse("192.0.2.2");
private Mock<Func<IPAddress, CancellationToken, Task<IDisposable>>> _connectSocketMock;
private Mock<Func<CancellationToken, Task>> _taskDelayMock;
private HappyEyeballs<IDisposable> _happyEyeballs;
[SetUp]
public void SetUp()
{
_connectSocketMock = new Mock<Func<IPAddress, CancellationToken, Task<IDisposable>>>(MockBehavior.Strict);
_taskDelayMock = new Mock<Func<CancellationToken, Task>>(MockBehavior.Strict);
_happyEyeballs = new HappyEyeballs<IDisposable>(_connectSocketMock.Object, _taskDelayMock.Object);
}
[Test]
public void should_throw_exception_when_no_ips_resolved()
{
var addresses = Array.Empty<IPAddress>();
var cancellationToken = CancellationToken.None;
Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () =>
await _happyEyeballs.Connect(addresses, cancellationToken));
}
[Test]
public async Task should_connect_successfully_when_valid_addresses_are_provided()
{
var addresses = new[]
{
ipv4Address1,
ipv6Address1,
};
var cancellationToken = CancellationToken.None;
var sequence = new MockSequence();
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(TaskFromCancellationToken);
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
Assert.NotNull(result);
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Once);
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Once);
}
[Test]
public void should_throw_aggregateexception_when_no_addresses_successfully_connect()
{
var addresses = new[]
{
ipv4Address1,
ipv4Address2,
ipv6Address1,
ipv6Address2,
};
var cancellationToken = CancellationToken.None;
var sequence = new MockSequence();
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception());
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(TaskFromCancellationToken);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception());
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(TaskFromCancellationToken);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address2, It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception());
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(TaskFromCancellationToken);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv4Address2, It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception());
var ex = Assert.ThrowsAsync<AggregateException>(async () =>
await _happyEyeballs.Connect(addresses, cancellationToken));
Assert.That(ex.InnerExceptions, Has.Count.EqualTo(4));
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(4));
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Exactly(3));
}
[Test]
public void should_throw_operationcanceledexception_when_canceled()
{
var addresses = new[]
{
ipv6Address1,
};
using var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
_connectSocketMock
.Setup(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()))
.Callback((IPAddress _, CancellationToken _) => cancellationTokenSource.Cancel())
.ThrowsAsync(new Exception());
var ex = Assert.ThrowsAsync<OperationCanceledException>(async () =>
await _happyEyeballs.Connect(addresses, cancellationToken));
Assert.That(ex.CancellationToken.IsCancellationRequested, Is.True);
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Once);
}
[Test]
public async Task should_connect_to_ipv4_when_ipv6_connection_fails()
{
var addresses = new[]
{
ipv4Address1,
ipv6Address1,
};
var cancellationToken = CancellationToken.None;
var sequence = new MockSequence();
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception());
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(TaskFromCancellationToken);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
Assert.NotNull(result);
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(2));
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Once);
}
[Test]
public async Task should_dispose_multiple_successful_connections()
{
var addresses = new[]
{
ipv4Address1,
ipv6Address1,
ipv6Address2,
};
var cancellationToken = CancellationToken.None;
var disposableMock = new Mock<IDisposable>(MockBehavior.Strict);
disposableMock.Setup(x => x.Dispose());
var sequence = new MockSequence();
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address1, It.IsAny<CancellationToken>()))
.Returns((IPAddress _, CancellationToken cancel) => ReturnAfterCancellation(cancel, disposableMock.Object));
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv4Address1, It.IsAny<CancellationToken>()))
.Returns((IPAddress _, CancellationToken cancel) => ReturnAfterCancellation(cancel, disposableMock.Object));
_taskDelayMock.InSequence(sequence)
.Setup(x => x(It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
_connectSocketMock.InSequence(sequence)
.Setup(x => x(ipv6Address2, It.IsAny<CancellationToken>()))
.ReturnsAsync(Mock.Of<IDisposable>(MockBehavior.Strict));
var result = await _happyEyeballs.Connect(addresses, cancellationToken);
Assert.NotNull(result);
_connectSocketMock.Verify(x => x(It.IsAny<IPAddress>(), It.IsAny<CancellationToken>()), Times.Exactly(3));
_taskDelayMock.Verify(x => x(It.IsAny<CancellationToken>()), Times.Exactly(2));
disposableMock.Verify(x => x.Dispose(), Times.Exactly(2));
}
private static async Task<IDisposable> ReturnAfterCancellation(CancellationToken cancellationToken, IDisposable socket)
{
await TaskFromCancellationToken(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
return socket;
}
private static Task TaskFromCancellationToken(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource();
cancellationToken.Register(() => tcs.TrySetCanceled());
return tcs.Task;
}
}
}

View file

@ -1,9 +1,6 @@
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Net.Security;
using System.Net.Sockets;
using System.Text;
@ -12,6 +9,7 @@
using NLog;
using NzbDrone.Common.Cache;
using NzbDrone.Common.Extensions;
using NzbDrone.Common.Http.HappyEyeballs;
using NzbDrone.Common.Http.Proxy;
namespace NzbDrone.Common.Http.Dispatchers
@ -20,18 +18,13 @@ public class ManagedHttpDispatcher : IHttpDispatcher
{
private const string NO_PROXY_KEY = "no-proxy";
private const int connection_establish_timeout = 2000;
private static bool useIPv6 = Socket.OSSupportsIPv6;
private static bool hasResolvedIPv6Availability;
private readonly IHttpProxySettingsProvider _proxySettingsProvider;
private readonly ICreateManagedWebProxy _createManagedWebProxy;
private readonly ICertificateValidationService _certificateValidationService;
private readonly IUserAgentBuilder _userAgentBuilder;
private readonly ICached<System.Net.Http.HttpClient> _httpClientCache;
private readonly ICached<CredentialCache> _credentialCache;
private readonly Logger _logger;
private readonly HttpHappyEyeballs _httpHappyEyeballs;
public ManagedHttpDispatcher(IHttpProxySettingsProvider proxySettingsProvider,
ICreateManagedWebProxy createManagedWebProxy,
@ -48,7 +41,7 @@ public ManagedHttpDispatcher(IHttpProxySettingsProvider proxySettingsProvider,
_httpClientCache = cacheManager.GetCache<System.Net.Http.HttpClient>(typeof(ManagedHttpDispatcher));
_credentialCache = cacheManager.GetCache<CredentialCache>(typeof(ManagedHttpDispatcher), "credentialcache");
_logger = logger;
_httpHappyEyeballs = new HttpHappyEyeballs(logger);
}
public async Task<HttpResponse> GetResponseAsync(HttpRequest request, CookieContainer cookies)
@ -164,7 +157,7 @@ protected virtual System.Net.Http.HttpClient CreateHttpClient(HttpProxySettings
Credentials = GetCredentialCache(),
PreAuthenticate = true,
MaxConnectionsPerServer = 12,
ConnectCallback = onConnect,
ConnectCallback = Socket.OSSupportsIPv6 ? _httpHappyEyeballs.OnConnect : null,
SslOptions = new SslClientAuthenticationOptions
{
RemoteCertificateValidationCallback = _certificateValidationService.ShouldByPassValidationError
@ -254,87 +247,5 @@ private CredentialCache GetCredentialCache()
{
return _credentialCache.Get("credentialCache", () => new CredentialCache());
}
private bool HasRoutableIPv4Address()
{
// Get all IPv4 addresses from all interfaces and return true if there are any with non-loopback addresses
try
{
var networkInterfaces = NetworkInterface.GetAllNetworkInterfaces();
return networkInterfaces.Any(ni =>
ni.OperationalStatus == OperationalStatus.Up &&
ni.GetIPProperties().UnicastAddresses.Any(ip =>
ip.Address.AddressFamily == AddressFamily.InterNetwork &&
!IPAddress.IsLoopback(ip.Address)));
}
catch (Exception e)
{
_logger.Debug(e, "Caught exception while GetAllNetworkInterfaces assuming IPv4 connectivity: {0}", e.Message);
return true;
}
}
private async ValueTask<Stream> onConnect(SocketsHttpConnectionContext context, CancellationToken cancellationToken)
{
// Until .NET supports an implementation of Happy Eyeballs (https://tools.ietf.org/html/rfc8305#section-2), let's make IPv4 fallback work in a simple way.
// This issue is being tracked at https://github.com/dotnet/runtime/issues/26177 and expected to be fixed in .NET 6.
if (useIPv6)
{
try
{
var localToken = cancellationToken;
if (!hasResolvedIPv6Availability)
{
// to make things move fast, use a very low timeout for the initial ipv6 attempt.
var quickFailCts = new CancellationTokenSource(connection_establish_timeout);
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, quickFailCts.Token);
localToken = linkedTokenSource.Token;
}
return await attemptConnection(AddressFamily.InterNetworkV6, context, localToken);
}
catch
{
// Do not retry IPv6 if a routable IPv4 address is available, otherwise continue to attempt IPv6 connections.
var routableIPv4 = HasRoutableIPv4Address();
_logger.Info("IPv4 is available: {0}, IPv6 will be {1}", routableIPv4, routableIPv4 ? "disabled" : "left enabled");
useIPv6 = !routableIPv4;
}
finally
{
hasResolvedIPv6Availability = true;
}
}
// fallback to IPv4.
return await attemptConnection(AddressFamily.InterNetwork, context, cancellationToken);
}
private static async ValueTask<Stream> attemptConnection(AddressFamily addressFamily, SocketsHttpConnectionContext context, CancellationToken cancellationToken)
{
// The following socket constructor will create a dual-mode socket on systems where IPV6 is available.
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
{
// Turn off Nagle's algorithm since it degrades performance in most HttpClient scenarios.
NoDelay = true
};
try
{
await socket.ConnectAsync(context.DnsEndPoint, cancellationToken).ConfigureAwait(false);
// The stream should take the ownership of the underlying socket,
// closing it when it's disposed.
return new NetworkStream(socket, ownsSocket: true);
}
catch
{
socket.Dispose();
throw;
}
}
}
}

View file

@ -0,0 +1,182 @@
/*
Until .NET implements Happy Eyeballs natively, use third-party implementation from
https://slugcat.systems/post/24-06-16-ipv6-is-hard-happy-eyeballs-dotnet-httpclient/#the-implementation
This issue is being tracked at https://github.com/dotnet/runtime/issues/26177.
Below is a slightly modified Happy Eyeballs implementation from the post above.
Weve factored out HTTP-specific implementation into HttpHappyEyeballs class to
make testing easier.
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace NzbDrone.Common.Http.HappyEyeballs;
public class HappyEyeballs<TSocket>
where TSocket : IDisposable
{
private readonly Func<IPAddress, CancellationToken, Task<TSocket>> _connectSocket;
private readonly Func<CancellationToken, Task> _taskDelay;
public HappyEyeballs(
Func<IPAddress, CancellationToken, Task<TSocket>> connectSocket,
Func<CancellationToken, Task> taskDelay)
{
_connectSocket = connectSocket;
_taskDelay = taskDelay;
}
public async ValueTask<TSocket> Connect(
IPAddress[] addresses,
CancellationToken cancellationToken)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(addresses.Length);
var ips = SortInterleaved(addresses);
return await ParallelTask(
ips.Length,
(i, cancel) => _connectSocket(ips[i], cancel),
cancellationToken).ConfigureAwait(false);
}
private IPAddress[] SortInterleaved(IPAddress[] addresses)
{
// Interleave returned addresses so that they are IPv6 -> IPv4 -> IPv6 -> IPv4.
// Assuming we have multiple addresses of the same type that is.
// As described in the RFC.
var ipv6 = addresses.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
var ipv4 = addresses.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
var commonLength = Math.Min(ipv6.Length, ipv4.Length);
var result = new IPAddress[addresses.Length];
for (var i = 0; i < commonLength; i++)
{
result[i * 2] = ipv6[i];
result[1 + (i * 2)] = ipv4[i];
}
if (ipv4.Length > ipv6.Length)
{
ipv4.AsSpan(commonLength).CopyTo(result.AsSpan(commonLength * 2));
}
else if (ipv6.Length > ipv4.Length)
{
ipv6.AsSpan(commonLength).CopyTo(result.AsSpan(commonLength * 2));
}
return result;
}
private async Task<TSocket> ParallelTask(
int totalTasks,
Func<int, CancellationToken, Task<TSocket>> taskBuilder,
CancellationToken cancellationToken)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(totalTasks);
using var successCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var taskIndex = 0;
var tasks = new List<Task<TSocket>>();
var innerExceptions = new List<Exception>();
// The general loop here is as follows:
// 1. Add a new task for the next IP to try.
// 2. Wait until any task completes OR the delay happens.
// If an error occurs, we stop checking that task and continue checking the next.
// Every iteration we add another task, until we're full on them.
// We keep looping until we have SUCCESS, or we run out of attempt tasks entirely.
Task<TSocket> successTask = null;
while (taskIndex < totalTasks || tasks.Count > 0)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
if (taskIndex < totalTasks)
{
// We have to queue another task this iteration.
var newTask = taskBuilder(taskIndex, successCts.Token);
tasks.Add(newTask);
taskIndex++;
}
var whenAnyDone = Task.WhenAny(tasks);
Task<TSocket> completedTask;
if (taskIndex < totalTasks)
{
using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(successCts.Token);
// If we have another one to queue, wait for a timeout instead of *just* waiting for a connection task.
var timeoutTask = _taskDelay(delayCts.Token);
var whenAnyOrTimeout = await Task.WhenAny(whenAnyDone, timeoutTask).ConfigureAwait(false);
if (whenAnyOrTimeout != whenAnyDone)
{
// Timeout finished. Go to next iteration so we queue another one.
continue;
}
// Ensure that we dispose the internal timer associated with Task.Delay.
await delayCts.CancelAsync().ConfigureAwait(false);
await timeoutTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
completedTask = whenAnyDone.Result;
}
else
{
completedTask = await whenAnyDone.ConfigureAwait(false);
}
tasks.Remove(completedTask);
if (completedTask.IsCompletedSuccessfully)
{
// We did it. We have success.
successTask = completedTask;
break;
}
else if (completedTask.IsFaulted)
{
innerExceptions.AddRange(completedTask.Exception!.InnerExceptions);
}
}
// Cancel and wait for all pending tasks.
await successCts.CancelAsync().ConfigureAwait(false);
await Task.WhenAll(tasks.Cast<Task>()).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
// Make sure that we don't get multiple sockets completing at once.
//
// Also for cancellation, e.g. if delay task completes before
// socket connection in the task loop, and we receive cancellation
// at the same time (thus stopping the loop). In this case, prefer
// throwing an exception instead of returning a successful task.
foreach (var task in tasks)
{
if (task.IsCompletedSuccessfully)
{
task.Result.Dispose();
}
}
if (successTask == null)
{
// We didn't get a single successful connection. Well heck.
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(innerExceptions.Count > 0);
throw new AggregateException(innerExceptions);
}
return successTask.Result;
}
}

View file

@ -0,0 +1,89 @@
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using NLog;
namespace NzbDrone.Common.Http.HappyEyeballs;
public class HttpHappyEyeballs
{
private const int ConnectionAttemptDelay = 250;
private readonly Logger _logger;
public HttpHappyEyeballs(Logger logger)
{
_logger = logger;
}
public async ValueTask<Stream> OnConnect(
SocketsHttpConnectionContext context,
CancellationToken cancellationToken)
{
var endPoint = context.DnsEndPoint;
var ipHostEntry = await Dns.GetHostEntryAsync(endPoint.Host, endPoint.AddressFamily, cancellationToken).ConfigureAwait(false);
var resolvedAddresses = ipHostEntry.AddressList;
if (resolvedAddresses.Length == 0)
{
throw new WebException(
$"The remote name {endPoint.Host} could not be resolved",
WebExceptionStatus.NameResolutionFailure);
}
var happyEyeballs = CreateHappyEyeballs(endPoint);
var socket = await happyEyeballs.Connect(resolvedAddresses, cancellationToken).ConfigureAwait(false);
_logger.Debug("Successfully connected {DnsEndPoint} to address: {RemoteEndPoint}", endPoint, socket.RemoteEndPoint);
return new NetworkStream(socket, ownsSocket: true);
}
private HappyEyeballs<Socket> CreateHappyEyeballs(DnsEndPoint endPoint)
{
return new HappyEyeballs<Socket>(
(ipAddress, cancel) => ConnectSocket(ipAddress, endPoint, cancel),
cancel => TaskDelay(endPoint, cancel));
}
private async Task TaskDelay(DnsEndPoint endPoint, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var timeSpan = TimeSpan.FromMilliseconds(ConnectionAttemptDelay);
_logger.Debug("Waiting on {DnsEndPoint} connection attempt delay for {TimeSpan}", endPoint, timeSpan);
await Task.Delay(timeSpan, cancellationToken).ConfigureAwait(false);
}
private async Task<Socket> ConnectSocket(
IPAddress ipAddress,
DnsEndPoint endPoint,
CancellationToken cancellationToken)
{
var remoteEP = new IPEndPoint(ipAddress, endPoint.Port);
// The following socket constructor will create a dual-mode socket on
// systems where IPv6 is available.
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
{
// Turn off Nagle's algorithm since it degrades performance in most
// HttpClient scenarios.
NoDelay = true
};
_logger.Debug("Trying Happy Eyeballs connection to {IPEndPoint} for host {DnsEndPoint}", ipAddress, endPoint);
try
{
await socket.ConnectAsync(remoteEP, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
socket.Dispose();
_logger.Debug(e, "Happy Eyeballs connection to {IPEndPoint} for host {DnsEndPoint} failed", ipAddress, endPoint);
throw;
}
return socket;
}
}