aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Transport.SimpleTCP
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-12-06 14:11:46 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-12-06 14:11:46 -0500
commitae18431a78f9e47f816b3a7db80552c9246cc587 (patch)
treee08e8262fd077fedeb4950353da95cdd2e66d772 /lib/Net.Transport.SimpleTCP
parenta6a88aae3e6cb39ebd8fe0b63a865168e680ef45 (diff)
fix tcp buffering with optimization, package updates, and C mem lib readmes
Diffstat (limited to 'lib/Net.Transport.SimpleTCP')
-rw-r--r--lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs55
-rw-r--r--lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj2
2 files changed, 51 insertions, 6 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
index db87357..209ab91 100644
--- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
+++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
@@ -29,6 +29,7 @@ using System.Threading;
using System.Net.Sockets;
using System.IO.Pipelines;
using System.Threading.Tasks;
+using System.Runtime.InteropServices;
using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
@@ -234,7 +235,7 @@ namespace VNLib.Net.Transport.Tcp
//Write segment to socket, and upate written data
int written = await sock.SendAsync(reader.Window, SocketFlags.None);
- if(written >= reader.WindowSize)
+ if(written == reader.WindowSize)
{
//All data was written
break;
@@ -274,6 +275,7 @@ namespace VNLib.Net.Transport.Tcp
private FlushResult _recvFlushRes;
+ private int _sysSocketBufferSize;
private async Task RecvDoWorkAsync(Socket sock, bool initialData)
{
@@ -284,7 +286,7 @@ namespace VNLib.Net.Transport.Tcp
try
{
//Avoid syscall?
- int bufferSize = sock.ReceiveBufferSize;
+ _sysSocketBufferSize = sock.ReceiveBufferSize;
//If initial data was buffered, it needs to be published to the reader
if (initialData)
@@ -303,7 +305,7 @@ namespace VNLib.Net.Transport.Tcp
while (true)
{
//Get buffer from pipe writer
- Memory<byte> buffer = RecvPipe.Writer.GetMemory(bufferSize);
+ Memory<byte> buffer = RecvPipe.Writer.GetMemory(_sysSocketBufferSize);
//Wait for data or error from socket
int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token);
@@ -397,8 +399,10 @@ namespace VNLib.Net.Transport.Tcp
SendTimer.Restart(SendTimeoutMs);
try
{
+ CopyAndPublishDataOnSendPipe(data);
+
//Send the segment
- ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation);
+ ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation);
//Task completed successfully, so
if (result.IsCompleted)
@@ -430,8 +434,10 @@ namespace VNLib.Net.Transport.Tcp
private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
{
+ CopyAndPublishDataOnSendPipe(data);
+
//Send the segment
- ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation);
+ ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation);
//Task completed successfully, so
if (result.IsCompleted)
@@ -454,6 +460,45 @@ namespace VNLib.Net.Transport.Tcp
}
}
+ private void CopyAndPublishDataOnSendPipe(ReadOnlyMemory<byte> src)
+ {
+ /*
+ * Clamp the buffer size to the system socket buffer size. If the
+ * buffer is larger then, we will need to publish multiple segments
+ */
+ if(src.Length > _sysSocketBufferSize)
+ {
+ //Store local src buffer reference to copy to
+ ref byte srcRef = ref MemoryMarshal.GetReference(src.Span);
+
+ uint written = 0;
+ while (written < src.Length)
+ {
+ int dataToCopy = (int)Math.Min(_sysSocketBufferSize, src.Length - written);
+
+ //Get a new buffer span, and ref
+ Span<byte> dest = SendPipe.Writer.GetSpan(dataToCopy);
+ ref byte destRef = ref MemoryMarshal.GetReference(dest);
+
+ //Copy data to the buffer at the new position
+ MemoryUtil.Memmove(ref srcRef, written, ref destRef, 0, (uint)dataToCopy);
+
+ //Advance the writer by the number of bytes written
+ SendPipe.Writer.Advance(dataToCopy);
+
+ //Increment the written count
+ written += (uint)dataToCopy;
+ }
+ }
+ else
+ {
+ //Single segment, just copy to the writer
+ Span<byte> dest = SendPipe.Writer.GetSpan(src.Length);
+ src.Span.CopyTo(dest);
+ SendPipe.Writer.Advance(src.Length);
+ }
+ }
+
ValueTask ITransportInterface.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
{
//Use timer if timeout is set, dont otherwise
diff --git a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj
index bfd9fda..ac3e0a1 100644
--- a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj
+++ b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj
@@ -32,7 +32,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
+ <PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
</ItemGroup>
<ItemGroup>