Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.100
dotnet-version: 9.0.303
- name: Build with dotnet
run: dotnet build --configuration Release
- name: Test
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.100
dotnet-version: 9.0.303
- name: Build with dotnet
run: dotnet build --configuration Release
- name: Test
Expand Down
50 changes: 22 additions & 28 deletions Sox.EchoServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ public class Program
{
private static WebSocketServer _server;

private static readonly ManualResetEventSlim _serverWaitHandle;
private static readonly ManualResetEventSlim ServerWaitHandle;

private static readonly IPAddress _ipAddress = IPAddress.Parse("127.0.0.1");
private static readonly IPAddress IpAddress = IPAddress.Parse("127.0.0.1");

private static int MessageCount;
private static int _messageCount;

private static readonly object locker = new();
private static readonly object Locker = new();

static Program()
{
_serverWaitHandle = new ManualResetEventSlim();
ServerWaitHandle = new ManualResetEventSlim();
}

static void Main(string[] args)
Expand All @@ -41,16 +41,15 @@ public static int StartServer()
_server.OnConnection += OnConnect;
_server.OnDisconnection += OnDisconnect;
_server.OnTextMessage += OnTextMessage;
_server.OnBinaryMessage += OnBinaryMessage;
_server.OnError += OnError;
_server.OnFrame += OnFrame;

try
{
Console.WriteLine($"Starting Sox server...");
Console.WriteLine("Starting Sox server...");
_ = _server.Start();
Console.WriteLine($"Sox server listening on {_server.Protocol.ToString().ToLower()}://{_server.IpAddress}:{_server.Port}");
_serverWaitHandle.Wait();
ServerWaitHandle.Wait();
return 0;
}
catch (Exception e)
Expand All @@ -64,29 +63,29 @@ public static int StartServer()
private static WebSocketServer CreateServer(string protocol = "ws") => protocol switch
{
"ws" => new WebSocketServer(
ipAddress: _ipAddress,
ipAddress: IpAddress,
port: 8888),
"wss" => new WebSocketServer(
ipAddress: _ipAddress,
ipAddress: IpAddress,
port: 443,
x509Certificate: new X509Certificate2($"sox.pfx", "sox")),
x509Certificate: X509CertificateLoader.LoadCertificateFromFile("sox.pfx")),
_ => throw new NotSupportedException($"{protocol} not supported")
};

private static async Task StopServer()
{
await _server.Stop();
_serverWaitHandle.Set();
ServerWaitHandle.Set();
}

private static async void OnSigTerm(AssemblyLoadContext ctx)
private static void OnSigTerm(AssemblyLoadContext ctx)
{
await StopServer();
StopServer().Wait();
}

private static async void OnSigTerm(object sender, ConsoleCancelEventArgs e)
private static void OnSigTerm(object sender, ConsoleCancelEventArgs e)
{
await StopServer();
StopServer().Wait();
}

private static void OnConnect(object sender, OnConnectionEventArgs eventArgs)
Expand All @@ -101,23 +100,18 @@ private static void OnDisconnect(object sender, OnDisconnectionEventArgs eventAr
Console.WriteLine($"{eventArgs.Connection.Id} disconnected! | {s.ConnectionCount}");
}

private static async void OnTextMessage(object sender, OnTextMessageEventArgs eventArgs)
private static void OnTextMessage(object sender, OnTextMessageEventArgs eventArgs)
{
var connection = eventArgs.Connection;
var message = eventArgs.Payload;
lock (locker)

lock (Locker)
{
Interlocked.Increment(ref MessageCount);
Console.WriteLine($"{connection.Id} sent {message} (message #{MessageCount})");
Interlocked.Increment(ref _messageCount);
Console.WriteLine($"{connection.Id} sent {message} (message #{_messageCount})");
}
await connection.Send($"{connection.Id} sent {message}");
}

private static void OnBinaryMessage(object sender, OnBinaryMessageEventArgs eventArgs)
{
var connection = eventArgs.Connection;
var message = eventArgs.Payload;
Console.WriteLine($"{connection.Id} sent {message}");
connection.Send($"{connection.Id} sent {message}").Wait();
}

private static void OnError(object sender, OnErrorEventArgs eventArgs)
Expand All @@ -127,7 +121,7 @@ private static void OnError(object sender, OnErrorEventArgs eventArgs)

private static void OnFrame(object sender, OnFrameEventArgs eventArgs)
{
Console.WriteLine($"CID: {eventArgs.Connection.Id} | Received WS Frame ({eventArgs.Frame.OpCode}) | : Plength - {eventArgs.Frame.PayloadLength:N0}");
// Console.WriteLine($"CID: {eventArgs.Connection.Id} | Received Frame ({eventArgs.Frame.OpCode}) | : Plength - {eventArgs.Frame.PayloadLength:N0}");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sox.EchoServer/Sox.EchoServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
Expand Down
2 changes: 1 addition & 1 deletion Sox.Tests/Sox.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
4 changes: 4 additions & 0 deletions Sox/Server/State/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public sealed class Connection : IDisposable

private readonly int _maxMessageBytes;

private bool _disposed;

/// <summary>
/// Contruct a connection
/// </summary>
Expand Down Expand Up @@ -193,11 +195,13 @@ public void Dispose()

private void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
_stream?.Dispose();
_pinger?.Dispose();
}
_disposed = true;
}

private async void Ping(object sender, ElapsedEventArgs elapsedEventArgs)
Expand Down
103 changes: 61 additions & 42 deletions Sox/Server/WebSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Sox.Websocket.Rfc6455.Messaging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -91,9 +92,9 @@ public long ConnectionCount
{
get
{
lock (_locker)
lock (_connectionLock)
{
return _connectionCount;
return _connections.Count;
}
}
}
Expand All @@ -106,17 +107,17 @@ public long ConnectionCount
/// <summary>
/// The period of time before a connection will timeout on read
/// </summary>
public readonly int ConnectionReadTimeoutMs = 0;
public readonly int ConnectionReadTimeoutMs;

private CancellationTokenSource _cancellationTokenSource;

private TcpListener _server;

private readonly ConcurrentDictionary<string, Connection> _connections = new ConcurrentDictionary<string, Connection>();

private readonly object _locker = new object();
private readonly object _connectionLock = new();

private long _connectionCount = 0;
private bool _disposed;

/// <summary>
/// Default constructor
Expand Down Expand Up @@ -166,11 +167,8 @@ public async Task Start()
}
catch (Exception ex)
{
if (!(ex is ObjectDisposedException) && (ex is SocketException))
{
Console.WriteLine(ex);
client?.Close();
}
Console.WriteLine(ex);
client?.Close();
}
}
}
Expand All @@ -179,33 +177,27 @@ public async Task Start()
public async Task Stop()
{
_cancellationTokenSource.Cancel();
var connectionIds = _connections.Keys.ToList();
List<string> connectionIds;
lock (_connectionLock)
{
connectionIds = _connections.Keys.ToList();
}
foreach (var id in connectionIds)
{
await CloseConnection(_connections[id], CloseStatusCode.GoingAway);
Connection conn;
lock (_connectionLock)
{
conn = _connections.GetValueOrDefault(id);
}
if (conn != null)
{
await CloseConnection(conn, CloseStatusCode.GoingAway);
}
}

_server.Stop();
}

/// <summary>
/// Dispose of all unmanaged resources
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (disposing)
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}
}

private async Task HandleHttpUpgrade(TcpClient client)
{
Stream stream;
Expand Down Expand Up @@ -243,7 +235,7 @@ private async Task HandleHttpUpgrade(TcpClient client)
{
Console.WriteLine($"Invalid Http Request: {ex}");
stream.Close();
stream.Dispose();
await stream.DisposeAsync();
}
}

Expand All @@ -270,15 +262,19 @@ private async Task ProcessHandshake(Stream stream, HttpRequest httpRequest)
// Begin sending the data to the remote device.
await connection.Send(response);
connection.State = ConnectionState.Open;
_connections[connection.Id] = connection;
Interlocked.Increment(ref _connectionCount);
OnConnection?.Invoke(this, new OnConnectionEventArgs(connection));
lock (_connectionLock)
{
_connections[connection.Id] = connection;
OnConnection?.Invoke(this, new OnConnectionEventArgs(connection));
}
await StartClientHandler(connection);
}

// The body of this task should live in the connection.
private async Task StartClientHandler(Connection connection)
{
await Task.Factory.StartNew(async (state) =>
// Should be Task.Run, Task.Factory.StartNew doesn't handle async properly
await Task.Run(async () =>
{
while (connection.State == ConnectionState.Open)
{
Expand All @@ -297,9 +293,10 @@ await Task.Factory.StartNew(async (state) =>
}
}
}
}, TaskCreationOptions.AttachedToParent, cancellationToken: _cancellationTokenSource.Token);
}, cancellationToken: _cancellationTokenSource.Token);
}

// I would move this to the connection class
private async Task HandleFrame(Connection connection, Frame frame)
{
// Close connection if not masked
Expand All @@ -312,9 +309,7 @@ private async Task HandleFrame(Connection connection, Frame frame)

switch (frame.OpCode)
{
case OpCode.Binary:
case OpCode.Text:
case OpCode.Continuation:
case OpCode.Binary or OpCode.Text or OpCode.Continuation when connection.State == ConnectionState.Open:
await HandleDataFrame(frame, connection);
break;
case OpCode.Close:
Expand All @@ -332,6 +327,7 @@ private async Task HandleFrame(Connection connection, Frame frame)
}
}

// I would move this to the connection class
private async Task HandleDataFrame(Frame frame, Connection connection)
{
if (await connection.TryAddFrame(frame))
Expand Down Expand Up @@ -377,9 +373,11 @@ private static void HandlePongFrame(Connection connection)
private async Task CloseConnection(Connection connection, CloseStatusCode reason)
{
await connection.Close(reason);
Interlocked.Decrement(ref _connectionCount);
OnDisconnection?.Invoke(this, new OnDisconnectionEventArgs(connection));
RemoveConnection(connection.Id);
lock (_connectionLock)
{
RemoveConnection(connection.Id);
OnDisconnection?.Invoke(this, new OnDisconnectionEventArgs(connection));
}
}

private void RemoveConnection(string id)
Expand All @@ -389,5 +387,26 @@ private void RemoveConnection(string id)
removed.Dispose();
}
}

/// <summary>
/// Dispose of all unmanaged resources
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
}

_disposed = true;
}
}
}
Loading