aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM/src
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-27 01:39:44 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-27 01:39:44 -0400
commit1074db64cc7bae103240ff1220d50d1958b7a900 (patch)
treea9cff64a3ce836027820b1c536b1a88db4e13a0d /lib/Net.Messaging.FBM/src
parentab07d9d36e3e61f48584920d882d95dead6e7600 (diff)
Native compression lib first build, managed, and tests
Diffstat (limited to 'lib/Net.Messaging.FBM/src')
-rw-r--r--lib/Net.Messaging.FBM/src/Helpers.cs33
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListener.cs212
2 files changed, 121 insertions, 124 deletions
diff --git a/lib/Net.Messaging.FBM/src/Helpers.cs b/lib/Net.Messaging.FBM/src/Helpers.cs
index cce1d27..4fa00fa 100644
--- a/lib/Net.Messaging.FBM/src/Helpers.cs
+++ b/lib/Net.Messaging.FBM/src/Helpers.cs
@@ -58,6 +58,11 @@ namespace VNLib.Net.Messaging.FBM
public static ReadOnlyMemory<byte> Termination { get; } = new byte[] { 0xFF, 0xF1 };
/// <summary>
+ /// Allocates a random integer to use as a message id
+ /// </summary>
+ public static int RandomMessageId => RandomNumberGenerator.GetInt32(1, int.MaxValue);
+
+ /// <summary>
/// Parses the header line for a message-id
/// </summary>
/// <param name="line">A sequence of bytes that make up a header line</param>
@@ -106,13 +111,7 @@ namespace VNLib.Net.Messaging.FBM
accumulator.Append(buffer);
WriteTermination(accumulator);
- }
-
-
- /// <summary>
- /// Alloctes a random integer to use as a message id
- /// </summary>
- public static int RandomMessageId => RandomNumberGenerator.GetInt32(1, int.MaxValue);
+ }
/// <summary>
/// Gets the remaining data after the current position of the stream.
@@ -120,10 +119,7 @@ namespace VNLib.Net.Messaging.FBM
/// <param name="response">The stream to segment</param>
/// <returns>The remaining data segment</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static ReadOnlySpan<byte> GetRemainingData(VnMemoryStream response)
- {
- return response.AsSpan()[(int)response.Position..];
- }
+ public static ReadOnlySpan<byte> GetRemainingData(VnMemoryStream response) => response.AsSpan()[(int)response.Position..];
/// <summary>
/// Reads the next available line from the response message
@@ -212,10 +208,7 @@ namespace VNLib.Net.Messaging.FBM
/// <param name="line"></param>
/// <returns>The <see cref="HeaderCommand"/> enum value from hte first byte of the message</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static HeaderCommand GetHeaderCommand(ReadOnlySpan<byte> line)
- {
- return (HeaderCommand)line[0];
- }
+ public static HeaderCommand GetHeaderCommand(ReadOnlySpan<byte> line) => (HeaderCommand)line[0];
/// <summary>
/// Gets the value of the header following the colon bytes in the specifed
@@ -239,8 +232,7 @@ namespace VNLib.Net.Messaging.FBM
//Decode the characters and return the char count
_ = encoding.GetChars(value, output);
return charCount;
- }
-
+ }
/// <summary>
/// Ends the header section of the request and appends the message body to
@@ -257,7 +249,6 @@ namespace VNLib.Net.Messaging.FBM
buffer.Append(body);
}
-
/// <summary>
/// Rounds the requested byte size up to the 1kb
/// number of bytes
@@ -274,7 +265,6 @@ namespace VNLib.Net.Messaging.FBM
return kbs * 1024;
}
-
/// <summary>
/// Writes a line termination to the message buffer
/// </summary>
@@ -292,6 +282,11 @@ namespace VNLib.Net.Messaging.FBM
/// <exception cref="ArgumentException"></exception>
public static void WriteHeader(IDataAccumulator<byte> buffer, byte header, ReadOnlySpan<char> value, Encoding encoding)
{
+ if(header == 0)
+ {
+ throw new ArgumentException("A header command of 0 is illegal", nameof(header));
+ }
+
//Write header command enum value
buffer.Append(header);
//Convert the characters to binary and write to the buffer
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
index 3417abc..fd8b025 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
@@ -54,110 +54,7 @@ namespace VNLib.Net.Messaging.FBM.Server
/// and raises events on requests.
/// </summary>
public class FBMListener
- {
- private sealed class ListeningSession
- {
- private readonly ReusableStore<FBMContext> CtxStore;
- private readonly CancellationTokenSource Cancellation;
- private readonly CancellationTokenRegistration Registration;
- private readonly FBMListenerSessionParams Params;
-
-
- public readonly object? UserState;
-
- public readonly SemaphoreSlim ResponseLock;
-
- public readonly WebSocketSession Socket;
-
- public readonly RequestHandler OnRecieved;
-
- public CancellationToken CancellationToken => Cancellation.Token;
-
-
- public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
- {
- Params = args;
- Socket = session;
- UserState = userState;
- OnRecieved = onRecieved;
-
- //Create cancellation and register for session close
- Cancellation = new();
- Registration = session.Token.Register(Cancellation.Cancel);
-
- ResponseLock = new(1);
- CtxStore = ObjectRental.CreateReusable(ContextCtor);
- }
-
- private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
-
- /// <summary>
- /// Cancels any pending opreations relating to the current session
- /// </summary>
- public void CancelSession()
- {
- Cancellation.Cancel();
-
- //If dispose happens without any outstanding requests, we can dispose the session
- if (_counter == 0)
- {
- CleanupInternal();
- }
- }
-
- private void CleanupInternal()
- {
- Registration.Dispose();
- CtxStore.Dispose();
- Cancellation.Dispose();
- ResponseLock.Dispose();
- }
-
-
- private uint _counter;
-
- /// <summary>
- /// Rents a new <see cref="FBMContext"/> instance from the pool
- /// and increments the counter
- /// </summary>
- /// <returns>The rented instance</returns>
- /// <exception cref="ObjectDisposedException"></exception>
- public FBMContext RentContext()
- {
-
- if (Cancellation.IsCancellationRequested)
- {
- throw new ObjectDisposedException("The instance has been disposed");
- }
-
- //Rent context
- FBMContext ctx = CtxStore.Rent();
- //Increment counter
- Interlocked.Increment(ref _counter);
-
- return ctx;
- }
-
- /// <summary>
- /// Returns a previously rented context to the pool
- /// and decrements the counter. If the session has been
- /// cancelled, when the counter reaches 0, cleanup occurs
- /// </summary>
- /// <param name="ctx">The context to return</param>
- public void ReturnContext(FBMContext ctx)
- {
- //Return the context
- CtxStore.Return(ctx);
-
- uint current = Interlocked.Decrement(ref _counter);
-
- //No more contexts in use, dispose internals
- if (Cancellation.IsCancellationRequested && current == 0)
- {
- CleanupInternal();
- }
- }
- }
+ {
public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
@@ -189,7 +86,8 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
{
- ListeningSession session = new(wss, handler, args, userState);
+ ListeningSession session = new(wss, handler, in args, userState);
+
//Alloc a recieve buffer
using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
@@ -385,5 +283,109 @@ namespace VNLib.Net.Messaging.FBM.Server
{
return Task.CompletedTask;
}
+
+ private sealed class ListeningSession
+ {
+ private readonly ReusableStore<FBMContext> CtxStore;
+ private readonly CancellationTokenSource Cancellation;
+ private readonly CancellationTokenRegistration Registration;
+ private readonly FBMListenerSessionParams Params;
+
+
+ public readonly object? UserState;
+
+ public readonly SemaphoreSlim ResponseLock;
+
+ public readonly WebSocketSession Socket;
+
+ public readonly RequestHandler OnRecieved;
+
+ public CancellationToken CancellationToken => Cancellation.Token;
+
+
+ public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
+ {
+ Params = args;
+ Socket = session;
+ UserState = userState;
+ OnRecieved = onRecieved;
+
+ //Create cancellation and register for session close
+ Cancellation = new();
+ Registration = session.Token.Register(Cancellation.Cancel);
+
+ ResponseLock = new(1);
+ CtxStore = ObjectRental.CreateReusable(ContextCtor);
+ }
+
+ private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
+
+ /// <summary>
+ /// Cancels any pending opreations relating to the current session
+ /// </summary>
+ public void CancelSession()
+ {
+ Cancellation.Cancel();
+
+ //If dispose happens without any outstanding requests, we can dispose the session
+ if (_counter == 0)
+ {
+ CleanupInternal();
+ }
+ }
+
+ private void CleanupInternal()
+ {
+ Registration.Dispose();
+ CtxStore.Dispose();
+ Cancellation.Dispose();
+ ResponseLock.Dispose();
+ }
+
+
+ private uint _counter;
+
+ /// <summary>
+ /// Rents a new <see cref="FBMContext"/> instance from the pool
+ /// and increments the counter
+ /// </summary>
+ /// <returns>The rented instance</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public FBMContext RentContext()
+ {
+
+ if (Cancellation.IsCancellationRequested)
+ {
+ throw new ObjectDisposedException("The instance has been disposed");
+ }
+
+ //Rent context
+ FBMContext ctx = CtxStore.Rent();
+ //Increment counter
+ Interlocked.Increment(ref _counter);
+
+ return ctx;
+ }
+
+ /// <summary>
+ /// Returns a previously rented context to the pool
+ /// and decrements the counter. If the session has been
+ /// cancelled, when the counter reaches 0, cleanup occurs
+ /// </summary>
+ /// <param name="ctx">The context to return</param>
+ public void ReturnContext(FBMContext ctx)
+ {
+ //Return the context
+ CtxStore.Return(ctx);
+
+ uint current = Interlocked.Decrement(ref _counter);
+
+ //No more contexts in use, dispose internals
+ if (Cancellation.IsCancellationRequested && current == 0)
+ {
+ CleanupInternal();
+ }
+ }
+ }
}
}