From ae18431a78f9e47f816b3a7db80552c9246cc587 Mon Sep 17 00:00:00 2001 From: vnugent Date: Wed, 6 Dec 2023 14:11:46 -0500 Subject: fix tcp buffering with optimization, package updates, and C mem lib readmes --- .../src/SocketPipeLineWorker.cs | 55 ++++++++++++++++++++-- .../src/VNLib.Net.Transport.SimpleTCP.csproj | 2 +- 2 files changed, 51 insertions(+), 6 deletions(-) (limited to 'lib/Net.Transport.SimpleTCP') diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs index db87357..209ab91 100644 --- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs +++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs @@ -29,6 +29,7 @@ using System.Threading; using System.Net.Sockets; using System.IO.Pipelines; using System.Threading.Tasks; +using System.Runtime.InteropServices; using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; @@ -234,7 +235,7 @@ namespace VNLib.Net.Transport.Tcp //Write segment to socket, and upate written data int written = await sock.SendAsync(reader.Window, SocketFlags.None); - if(written >= reader.WindowSize) + if(written == reader.WindowSize) { //All data was written break; @@ -274,6 +275,7 @@ namespace VNLib.Net.Transport.Tcp private FlushResult _recvFlushRes; + private int _sysSocketBufferSize; private async Task RecvDoWorkAsync(Socket sock, bool initialData) { @@ -284,7 +286,7 @@ namespace VNLib.Net.Transport.Tcp try { //Avoid syscall? - int bufferSize = sock.ReceiveBufferSize; + _sysSocketBufferSize = sock.ReceiveBufferSize; //If initial data was buffered, it needs to be published to the reader if (initialData) @@ -303,7 +305,7 @@ namespace VNLib.Net.Transport.Tcp while (true) { //Get buffer from pipe writer - Memory buffer = RecvPipe.Writer.GetMemory(bufferSize); + Memory buffer = RecvPipe.Writer.GetMemory(_sysSocketBufferSize); //Wait for data or error from socket int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token); @@ -397,8 +399,10 @@ namespace VNLib.Net.Transport.Tcp SendTimer.Restart(SendTimeoutMs); try { + CopyAndPublishDataOnSendPipe(data); + //Send the segment - ValueTask result = SendPipe.Writer.WriteAsync(data, cancellation); + ValueTask result = SendPipe.Writer.FlushAsync(cancellation); //Task completed successfully, so if (result.IsCompleted) @@ -430,8 +434,10 @@ namespace VNLib.Net.Transport.Tcp private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory data, CancellationToken cancellation) { + CopyAndPublishDataOnSendPipe(data); + //Send the segment - ValueTask result = SendPipe.Writer.WriteAsync(data, cancellation); + ValueTask result = SendPipe.Writer.FlushAsync(cancellation); //Task completed successfully, so if (result.IsCompleted) @@ -454,6 +460,45 @@ namespace VNLib.Net.Transport.Tcp } } + private void CopyAndPublishDataOnSendPipe(ReadOnlyMemory src) + { + /* + * Clamp the buffer size to the system socket buffer size. If the + * buffer is larger then, we will need to publish multiple segments + */ + if(src.Length > _sysSocketBufferSize) + { + //Store local src buffer reference to copy to + ref byte srcRef = ref MemoryMarshal.GetReference(src.Span); + + uint written = 0; + while (written < src.Length) + { + int dataToCopy = (int)Math.Min(_sysSocketBufferSize, src.Length - written); + + //Get a new buffer span, and ref + Span dest = SendPipe.Writer.GetSpan(dataToCopy); + ref byte destRef = ref MemoryMarshal.GetReference(dest); + + //Copy data to the buffer at the new position + MemoryUtil.Memmove(ref srcRef, written, ref destRef, 0, (uint)dataToCopy); + + //Advance the writer by the number of bytes written + SendPipe.Writer.Advance(dataToCopy); + + //Increment the written count + written += (uint)dataToCopy; + } + } + else + { + //Single segment, just copy to the writer + Span dest = SendPipe.Writer.GetSpan(src.Length); + src.Span.CopyTo(dest); + SendPipe.Writer.Advance(src.Length); + } + } + ValueTask ITransportInterface.SendAsync(ReadOnlyMemory data, CancellationToken cancellation) { //Use timer if timeout is set, dont otherwise diff --git a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj index bfd9fda..ac3e0a1 100644 --- a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj +++ b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj @@ -32,7 +32,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + -- cgit