diff options
author | vnugent <public@vaughnnugent.com> | 2023-12-06 14:11:46 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-12-06 14:11:46 -0500 |
commit | ae18431a78f9e47f816b3a7db80552c9246cc587 (patch) | |
tree | e08e8262fd077fedeb4950353da95cdd2e66d772 /lib/Net.Transport.SimpleTCP | |
parent | a6a88aae3e6cb39ebd8fe0b63a865168e680ef45 (diff) |
fix tcp buffering with optimization, package updates, and C mem lib readmes
Diffstat (limited to 'lib/Net.Transport.SimpleTCP')
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs | 55 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj | 2 |
2 files changed, 51 insertions, 6 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs index db87357..209ab91 100644 --- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs +++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs @@ -29,6 +29,7 @@ using System.Threading; using System.Net.Sockets; using System.IO.Pipelines; using System.Threading.Tasks; +using System.Runtime.InteropServices; using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; @@ -234,7 +235,7 @@ namespace VNLib.Net.Transport.Tcp //Write segment to socket, and upate written data int written = await sock.SendAsync(reader.Window, SocketFlags.None); - if(written >= reader.WindowSize) + if(written == reader.WindowSize) { //All data was written break; @@ -274,6 +275,7 @@ namespace VNLib.Net.Transport.Tcp private FlushResult _recvFlushRes; + private int _sysSocketBufferSize; private async Task RecvDoWorkAsync(Socket sock, bool initialData) { @@ -284,7 +286,7 @@ namespace VNLib.Net.Transport.Tcp try { //Avoid syscall? - int bufferSize = sock.ReceiveBufferSize; + _sysSocketBufferSize = sock.ReceiveBufferSize; //If initial data was buffered, it needs to be published to the reader if (initialData) @@ -303,7 +305,7 @@ namespace VNLib.Net.Transport.Tcp while (true) { //Get buffer from pipe writer - Memory<byte> buffer = RecvPipe.Writer.GetMemory(bufferSize); + Memory<byte> buffer = RecvPipe.Writer.GetMemory(_sysSocketBufferSize); //Wait for data or error from socket int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token); @@ -397,8 +399,10 @@ namespace VNLib.Net.Transport.Tcp SendTimer.Restart(SendTimeoutMs); try { + CopyAndPublishDataOnSendPipe(data); + //Send the segment - ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation); + ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation); //Task completed successfully, so if (result.IsCompleted) @@ -430,8 +434,10 @@ namespace VNLib.Net.Transport.Tcp private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation) { + CopyAndPublishDataOnSendPipe(data); + //Send the segment - ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation); + ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation); //Task completed successfully, so if (result.IsCompleted) @@ -454,6 +460,45 @@ namespace VNLib.Net.Transport.Tcp } } + private void CopyAndPublishDataOnSendPipe(ReadOnlyMemory<byte> src) + { + /* + * Clamp the buffer size to the system socket buffer size. If the + * buffer is larger then, we will need to publish multiple segments + */ + if(src.Length > _sysSocketBufferSize) + { + //Store local src buffer reference to copy to + ref byte srcRef = ref MemoryMarshal.GetReference(src.Span); + + uint written = 0; + while (written < src.Length) + { + int dataToCopy = (int)Math.Min(_sysSocketBufferSize, src.Length - written); + + //Get a new buffer span, and ref + Span<byte> dest = SendPipe.Writer.GetSpan(dataToCopy); + ref byte destRef = ref MemoryMarshal.GetReference(dest); + + //Copy data to the buffer at the new position + MemoryUtil.Memmove(ref srcRef, written, ref destRef, 0, (uint)dataToCopy); + + //Advance the writer by the number of bytes written + SendPipe.Writer.Advance(dataToCopy); + + //Increment the written count + written += (uint)dataToCopy; + } + } + else + { + //Single segment, just copy to the writer + Span<byte> dest = SendPipe.Writer.GetSpan(src.Length); + src.Span.CopyTo(dest); + SendPipe.Writer.Advance(src.Length); + } + } + ValueTask ITransportInterface.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation) { //Use timer if timeout is set, dont otherwise diff --git a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj index bfd9fda..ac3e0a1 100644 --- a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj +++ b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj @@ -32,7 +32,7 @@ <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> - <PackageReference Include="System.IO.Pipelines" Version="7.0.0" /> + <PackageReference Include="System.IO.Pipelines" Version="8.0.0" /> </ItemGroup> <ItemGroup> |