diff --git a/source/Octopus.Tentacle.Contracts/Octopus.Tentacle.Contracts.csproj b/source/Octopus.Tentacle.Contracts/Octopus.Tentacle.Contracts.csproj
index 8cb0811fc..aca1c7b3e 100644
--- a/source/Octopus.Tentacle.Contracts/Octopus.Tentacle.Contracts.csproj
+++ b/source/Octopus.Tentacle.Contracts/Octopus.Tentacle.Contracts.csproj
@@ -28,7 +28,7 @@
-
+
diff --git a/source/Octopus.Tentacle/Commands/RunAgentCommand.cs b/source/Octopus.Tentacle/Commands/RunAgentCommand.cs
index 68acb47cb..f38fd4ac9 100644
--- a/source/Octopus.Tentacle/Commands/RunAgentCommand.cs
+++ b/source/Octopus.Tentacle/Commands/RunAgentCommand.cs
@@ -5,13 +5,12 @@
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Security.Principal;
+using System.Threading.Tasks;
using Octopus.Tentacle.Background;
using Octopus.Tentacle.Communications;
using Octopus.Tentacle.Configuration;
using Octopus.Tentacle.Configuration.Instances;
using Octopus.Tentacle.Core.Diagnostics;
-using Octopus.Tentacle.Kubernetes;
-using Octopus.Tentacle.Maintenance;
using Octopus.Tentacle.Startup;
using Octopus.Tentacle.Util;
using Octopus.Tentacle.Variables;
@@ -38,6 +37,7 @@ public class RunAgentCommand : AbstractStandardCommand
readonly IEnumerable> backgroundTasks;
int wait;
bool halibutHasStarted;
+ string? relayConnectAddress;
public override bool CanRunAsService => true;
@@ -67,6 +67,7 @@ public RunAgentCommand(
this.appVersion = appVersion;
this.backgroundTasks = backgroundTasks;
+ Options.Add("relay-connect-address=", "Connect address of the relay", arg => relayConnectAddress = arg);
Options.Add("wait=", "Delay (ms) before starting", arg => wait = int.Parse(arg));
Options.Add("console", "Don't attempt to run as a service, even if the user is non-interactive", v =>
{
@@ -134,6 +135,21 @@ protected override void Start()
backgroundTaskLazy.Value.Start();
}
+ #if NET8_0_OR_GREATER
+
+ if (relayConnectAddress != null)
+ {
+ log.InfoFormat("Enabling connection agent via {0}", relayConnectAddress);
+
+ Task.Run(async () =>
+ {
+ var agent = new Octopus.Tentacle.SocksProxy.Agent(relayConnectAddress);
+ await agent.StartAsync();
+ });
+ }
+
+ #endif
+
Runtime.WaitForUserToExit();
}
diff --git a/source/Octopus.Tentacle/Octopus.Tentacle.csproj b/source/Octopus.Tentacle/Octopus.Tentacle.csproj
index 19a6cb98f..2764c0c8f 100644
--- a/source/Octopus.Tentacle/Octopus.Tentacle.csproj
+++ b/source/Octopus.Tentacle/Octopus.Tentacle.csproj
@@ -64,6 +64,8 @@
+
+
@@ -118,6 +120,7 @@
+
diff --git a/source/Octopus.Tentacle/SocksProxy/Agent.cs b/source/Octopus.Tentacle/SocksProxy/Agent.cs
new file mode 100644
index 000000000..1bcf5da43
--- /dev/null
+++ b/source/Octopus.Tentacle/SocksProxy/Agent.cs
@@ -0,0 +1,395 @@
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Serilog;
+
+namespace Octopus.Tentacle.SocksProxy
+{
+
+ public class Agent
+ {
+ readonly string connectUrl;
+ private const int BufferSize = 8192;
+
+ private const string AgentId = "agent-001";
+ private const int ConnectionCount = 5;
+ private static readonly SemaphoreSlim ConnectionSemaphore = new(ConnectionCount, ConnectionCount);
+ private static readonly ConcurrentDictionary ActiveConnections = new();
+
+
+ public Agent(string connectUrl)
+ {
+ this.connectUrl = connectUrl;
+ Log.Logger = new LoggerConfiguration()
+ .MinimumLevel.Debug()
+ .WriteTo.Console()
+ .Enrich.WithProperty("Application", "SocksAgent")
+ .Enrich.WithProperty("PoC", "Proxy")
+ .CreateLogger();
+ }
+
+ public async Task StartAsync()
+ {
+ for (var i = 0; i < ConnectionCount; i++)
+ {
+ _ = StartConnection();
+ }
+
+ // Monitor connections and ensure we always have the required number
+ await MonitorConnectionsAsync();
+ }
+
+
+ async Task MonitorConnectionsAsync()
+ {
+ while (true)
+ {
+ // Check current connection count
+ var currentCount = ActiveConnections.Count;
+ Log.Verbose("Current active connections: {ConnectionCount}", currentCount);
+
+ // If we have fewer than required, start new ones
+ if (currentCount < ConnectionCount)
+ {
+ var connectionsToAdd = ConnectionCount - currentCount;
+ Log.Information("Adding {Count} new connections", connectionsToAdd);
+
+ for (int i = 0; i < connectionsToAdd; i++)
+ {
+ _ = StartConnection();
+ }
+ }
+
+ // Remove completed tasks
+ foreach (var connection in ActiveConnections.Where(c => c.Value.IsCompleted).ToList())
+ {
+ if (ActiveConnections.TryRemove(connection.Key, out _))
+ {
+ Log.Debug("Removed completed connection {ConnectionId}", connection.Key);
+ }
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(5));
+ }
+ }
+
+ private async Task StartConnection()
+ {
+ var connectionId = Guid.NewGuid();
+
+ var connectionTask = EstablishConnectionAsync(connectionId);
+ ActiveConnections[connectionId] = connectionTask;
+
+ await connectionTask;
+
+ // When connection completes, remove it from active connections
+ ActiveConnections.TryRemove(connectionId, out _);
+ ConnectionSemaphore.Release();
+ }
+
+ private async Task EstablishConnectionAsync(Guid connectionId)
+ {
+ await ConnectionSemaphore.WaitAsync();
+
+ try
+ {
+ Log.Information("Connection {ConnectionId} connecting to proxy at {ForwardingProxyUrl}",
+ connectionId, connectUrl);
+
+ var websocket = new ClientWebSocket();
+ websocket.Options.SetRequestHeader("X-Agent-ID", $"{AgentId}-{connectionId}");
+ websocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(30);
+
+ await websocket.ConnectAsync(new Uri(connectUrl), CancellationToken.None);
+
+ Log.Information("Connection {ConnectionId} established", connectionId);
+
+ using var websocketStream = new ClientWebSocketStream(websocket);
+
+ await ProxyData(websocketStream, $"{AgentId}-{connectionId}");
+
+ Log.Information("Connection {ConnectionId} ended", connectionId);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error in connection {ConnectionId}", connectionId);
+ }
+ }
+
+
+ private static async Task ProxyData(Stream stream, string clientInfo)
+ {
+ // SOCKS5 initialization
+ if (!await HandleSocks5InitializationAsync(stream, clientInfo))
+ {
+ Log.Warning("Failed SOCKS5 initialization for client: {ClientInfo}", clientInfo);
+ return;
+ }
+
+ // SOCKS5 request
+ var request = await ReadSocks5RequestAsync(stream, clientInfo);
+ if (request == null)
+ {
+ Log.Warning("Failed to read SOCKS5 request from client: {ClientInfo}", clientInfo);
+ return;
+ }
+
+ // Handle connection request
+ await HandleConnectionRequestAsync(stream, request, clientInfo);
+ }
+
+ private static async Task HandleSocks5InitializationAsync(Stream stream, string clientInfo)
+ {
+ var buffer = new byte[256];
+ int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
+
+ if (bytesRead < 2 || buffer[0] != 0x05)
+ {
+ Log.Warning("Invalid SOCKS5 initialization from client {ClientInfo}: Not SOCKS5", clientInfo);
+ return false;
+ }
+
+ int methodCount = buffer[1];
+ if (bytesRead < 2 + methodCount)
+ {
+ Log.Warning("Invalid SOCKS5 initialization from client {ClientInfo}: Incomplete methods", clientInfo);
+ return false;
+ }
+
+ // Check if client supports no authentication (0x00)
+ bool noAuthSupported = false;
+ for (int i = 0; i < methodCount; i++)
+ {
+ if (buffer[2 + i] == 0x00)
+ {
+ noAuthSupported = true;
+ break;
+ }
+ }
+
+ if (!noAuthSupported)
+ {
+ // No acceptable authentication methods
+ await stream.WriteAsync(new byte[] { 0x05, 0xFF }, 0, 2);
+ Log.Warning("Client {ClientInfo} doesn't support no-auth method", clientInfo);
+ return false;
+ }
+
+ // Respond with no authentication method selected
+ await stream.WriteAsync(new byte[] { 0x05, 0x00 }, 0, 2);
+ Log.Debug("SOCKS5 initialization successful for client {ClientInfo}", clientInfo);
+ return true;
+ }
+
+ private static async Task ReadSocks5RequestAsync(Stream stream, string clientInfo)
+ {
+ var buffer = new byte[256];
+ int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
+
+ if (bytesRead < 4 || buffer[0] != 0x05)
+ {
+ Log.Warning("Invalid SOCKS5 request from client {ClientInfo}", clientInfo);
+ return null;
+ }
+
+ var request = new Socks5Request
+ {
+ Command = buffer[1],
+ AddressType = buffer[3]
+ };
+
+ // Parse address based on address type
+ switch (request.AddressType)
+ {
+ case 0x01: // IPv4
+ if (bytesRead < 10) return null;
+ request.DestinationAddress = new IPAddress(new byte[] { buffer[4], buffer[5], buffer[6], buffer[7] }).ToString();
+ request.DestinationPort = (ushort)((buffer[8] << 8) + buffer[9]);
+ break;
+
+ case 0x03: // Domain name
+ int domainLength = buffer[4];
+ if (bytesRead < 5 + domainLength + 2) return null;
+ request.DestinationAddress = System.Text.Encoding.ASCII.GetString(buffer, 5, domainLength);
+ request.DestinationPort = (ushort)((buffer[5 + domainLength] << 8) + buffer[5 + domainLength + 1]);
+ break;
+
+ case 0x04: // IPv6
+ if (bytesRead < 22) return null;
+ byte[] ipv6Bytes = new byte[16];
+ Array.Copy(buffer, 4, ipv6Bytes, 0, 16);
+ request.DestinationAddress = new IPAddress(ipv6Bytes).ToString();
+ request.DestinationPort = (ushort)((buffer[20] << 8) + buffer[21]);
+ break;
+
+ default:
+ Log.Warning("Unsupported address type: {AddressType} from client {ClientInfo}", request.AddressType, clientInfo);
+ await SendSocks5Response(stream, 0x08); // Address type not supported
+ return null;
+ }
+
+ Log.Information("SOCKS5 request from client {ClientInfo}: Command={Command}, Address={Address}, Port={Port}",
+ clientInfo, request.Command, request.DestinationAddress, request.DestinationPort);
+
+ return request;
+ }
+
+ private static async Task HandleConnectionRequestAsync(Stream clientStream, Socks5Request request, string clientInfo)
+ {
+ // Only support CONNECT command (0x01)
+ if (request.Command != 0x01)
+ {
+ Log.Warning("Unsupported SOCKS5 command: {Command} from client {ClientInfo}", request.Command, clientInfo);
+ await SendSocks5Response(clientStream, 0x07); // Command not supported
+ return;
+ }
+
+ // Resolve the destination address
+ IPAddress[] destinationAddresses;
+ try
+ {
+ if (IPAddress.TryParse(request.DestinationAddress, out var ipAddress))
+ {
+ destinationAddresses = new[] { ipAddress };
+ }
+ else
+ {
+ // Perform DNS resolution
+ destinationAddresses = await Dns.GetHostAddressesAsync(request.DestinationAddress!);
+ if (destinationAddresses.Length == 0)
+ {
+ Log.Warning("Could not resolve destination address: {Address} for client {ClientInfo}",
+ request.DestinationAddress, clientInfo);
+ await SendSocks5Response(clientStream, 0x04); // Host unreachable
+ return;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error resolving destination address: {Address} for client {ClientInfo}",
+ request.DestinationAddress, clientInfo);
+ await SendSocks5Response(clientStream, 0x04); // Host unreachable
+ return;
+ }
+
+ // Connect to the destination server
+ using var destinationClient = new TcpClient();
+
+ try
+ {
+ await destinationClient.ConnectAsync(destinationAddresses, request.DestinationPort);
+
+ var destinationEndpoint = destinationClient.Client.RemoteEndPoint as IPEndPoint;
+ var destinationInfo = destinationEndpoint?.ToString() ?? "unknown";
+
+ // Send success response
+ //await SendSocks5Response(clientStream, 0x00);
+ await SendSocks5Response(clientStream, 0x00, destinationClient.Client.LocalEndPoint as IPEndPoint);
+
+ // Start relaying data between client and destination
+ Log.Information("Connected to destination {DestAddress}:{DestPort} for client {ClientInfo}",
+ request.DestinationAddress, request.DestinationPort, clientInfo);
+
+ using var destinationStream = destinationClient.GetStream();
+ await Task.WhenAny(
+ ForwardDataAsync(clientStream, destinationStream, $"client({clientInfo}) -> destination({destinationInfo})"),
+ ForwardDataAsync(destinationStream, clientStream, $"destination({destinationInfo}) -> client({clientInfo})")
+ );
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error connecting to destination {DestAddress}:{DestPort} for client {ClientInfo}",
+ request.DestinationAddress, request.DestinationPort, clientInfo);
+ await SendSocks5Response(clientStream, 0x05); // Connection refused
+ }
+ }
+
+ private static async Task SendSocks5Response(Stream stream, byte status, IPEndPoint? localEndPoint = null)
+ {
+ // Create response buffer - the size depends on the address type
+ byte[] response;
+ byte addressType = 0x01; // Default to IPv4
+
+ if (localEndPoint != null && localEndPoint.Address.AddressFamily == System.Net.Sockets.AddressFamily.InterNetworkV6)
+ {
+ addressType = 0x04; // IPv6
+ response = new byte[22]; // 4 bytes header + 16 bytes IPv6 address + 2 bytes port
+ }
+ else
+ {
+ addressType = 0x01; // IPv4
+ response = new byte[10]; // 4 bytes header + 4 bytes IPv4 address + 2 bytes port
+ }
+
+ response[0] = 0x05; // SOCKS version
+ response[1] = status; // Status code
+ response[2] = 0x00; // Reserved
+ response[3] = addressType;
+
+ if (localEndPoint != null && status == 0x00)
+ {
+ // Fill in the bound address and port
+ byte[] addressBytes = localEndPoint.Address.GetAddressBytes();
+ Array.Copy(addressBytes, 0, response, 4, addressBytes.Length);
+
+ ushort port = (ushort)localEndPoint.Port;
+
+ // Port is always the last 2 bytes of the response
+ response[response.Length - 2] = (byte)(port >> 8);
+ response[response.Length - 1] = (byte)(port & 0xFF);
+ }
+
+ await stream.WriteAsync(response, 0, response.Length);
+ }
+
+ private static async Task ForwardDataAsync(Stream from, Stream to, string direction)
+ {
+ var sleepTime = TimeSpan.FromSeconds(5);
+ byte[] buffer = new byte[BufferSize];
+
+ try
+ {
+ while (true)
+ {
+ var bytesRead = await from.ReadAsync(buffer, 0, buffer.Length);
+
+ await to.WriteAsync(buffer, 0, bytesRead);
+ Log.Debug("Forward {BytesRead} bytes {Direction} ", bytesRead, direction);
+
+ if (bytesRead == 0)
+ {
+ Log.Debug("No more data to read {Direction}. Sleeping for {SleepTime}", direction, sleepTime);
+ break;
+ }
+ }
+ }
+ catch (IOException)
+ {
+ Log.Debug("IOException Expected when the other stream was closed");
+ }
+ catch (ObjectDisposedException)
+ {
+ Log.Debug("ObjectDisposedException Expected when the other stream was closed");
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error relaying data {Direction}", direction);
+ }
+ }
+ }
+
+ class Socks5Request
+ {
+ public byte Command { get; set; }
+ public byte AddressType { get; set; }
+ public string? DestinationAddress { get; set; }
+ public ushort DestinationPort { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/source/Octopus.Tentacle/SocksProxy/ClientWebSocketStream.cs b/source/Octopus.Tentacle/SocksProxy/ClientWebSocketStream.cs
new file mode 100644
index 000000000..df9a380eb
--- /dev/null
+++ b/source/Octopus.Tentacle/SocksProxy/ClientWebSocketStream.cs
@@ -0,0 +1,143 @@
+using System;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Serilog;
+
+namespace Octopus.Tentacle.SocksProxy
+{
+
+ ///
+ /// A Stream implementation that wraps a ClientWebSocket to provide standard Stream interface.
+ ///
+ public class ClientWebSocketStream : Stream
+ {
+ private readonly WebSocket _webSocket;
+ private readonly CancellationToken _cancellationToken;
+ private readonly SemaphoreSlim _readSemaphore = new SemaphoreSlim(1, 1);
+ private readonly SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1);
+ private bool _disposed = false;
+
+ public ClientWebSocketStream(WebSocket webSocket)
+ {
+ _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
+ _cancellationToken = CancellationToken.None;
+
+ }
+
+ public ClientWebSocketStream(ClientWebSocket webSocket, CancellationToken cancellationToken)
+ {
+ _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
+ _cancellationToken = cancellationToken;
+ }
+
+ public override bool CanRead => _webSocket.State == WebSocketState.Open;
+ public override bool CanSeek => false;
+ public override bool CanWrite => _webSocket.State == WebSocketState.Open;
+ public override long Length => throw new NotSupportedException("WebSocket streams do not support seeking");
+
+ public override long Position
+ {
+ get => throw new NotSupportedException("WebSocket streams do not support seeking");
+ set => throw new NotSupportedException("WebSocket streams do not support seeking");
+ }
+
+ public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ var effectiveCancellationToken = CombineCancellationTokens(cancellationToken);
+ await _readSemaphore.WaitAsync(effectiveCancellationToken);
+
+ try
+ {
+ var result = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, count), effectiveCancellationToken);
+
+ return result.Count;
+ }
+ finally
+ {
+ _readSemaphore.Release();
+ }
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ var effectiveCancellationToken = CombineCancellationTokens(cancellationToken);
+ await _writeSemaphore.WaitAsync(effectiveCancellationToken);
+
+ try
+ {
+ #if NET8_0_OR_GREATER
+ await _webSocket.SendAsync(new ArraySegment(buffer, offset, count), WebSocketMessageType.Binary, WebSocketMessageFlags.None, effectiveCancellationToken);
+ #else
+ await _webSocket.SendAsync(new ArraySegment(buffer, offset, count), WebSocketMessageType.Binary, false, effectiveCancellationToken);
+ #endif
+ }
+ finally
+ {
+ _writeSemaphore.Release();
+ }
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override void Flush()
+ {
+ // WebSockets send immediately, so flush is a no-op
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException("WebSocket streams do not support seeking");
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException("WebSocket streams do not support seeking");
+ }
+
+ private CancellationToken CombineCancellationTokens(CancellationToken externalToken)
+ {
+ if (externalToken == CancellationToken.None)
+ return _cancellationToken;
+
+ if (_cancellationToken == CancellationToken.None)
+ return externalToken;
+
+ var source = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, externalToken);
+ return source.Token;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!_disposed)
+ {
+ if (disposing)
+ {
+ // Close the WebSocket if it's still open
+ if (_webSocket.State == WebSocketState.Open)
+ {
+ Log.Information("Closing WebSocket connection gracefully...");
+ _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Stream disposed", CancellationToken.None)
+ .GetAwaiter().GetResult();
+ }
+
+ _readSemaphore.Dispose();
+ _writeSemaphore.Dispose();
+ }
+
+ _disposed = true;
+ }
+
+ base.Dispose(disposing);
+ }
+ }
+}