aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md1
-rw-r--r--lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs60
-rw-r--r--lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs36
-rw-r--r--lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj12
-rw-r--r--lib/Net.Http/src/Core/Compression/CompressionResult.cs42
-rw-r--r--lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs22
-rw-r--r--lib/Net.Http/src/Core/Compression/IResponseCompressor.cs22
-rw-r--r--lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs55
-rw-r--r--lib/Net.Http/src/Core/IHttpResponseBody.cs23
-rw-r--r--lib/Net.Http/src/Core/Request/HttpInputStream.cs3
-rw-r--r--lib/Net.Http/src/Core/Request/HttpRequest.cs3
-rw-r--r--lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs88
-rw-r--r--lib/Net.Http/src/Core/Response/ChunkedStream.cs176
-rw-r--r--lib/Net.Http/src/Core/Response/DirectStream.cs23
-rw-r--r--lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs50
-rw-r--r--lib/Net.Http/src/Core/Response/HttpResponse.cs76
-rw-r--r--lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs49
-rw-r--r--lib/Net.Http/src/Core/Response/IResponseDataWriter.cs56
-rw-r--r--lib/Net.Http/src/Core/Response/ResponseWriter.cs261
-rw-r--r--lib/Net.Http/src/Core/Response/ReusableResponseStream.cs55
-rw-r--r--lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj2
-rw-r--r--lib/Utils/src/Extensions/IoExtensions.cs5
-rw-r--r--lib/Utils/tests/VNLib.UtilsTests.csproj6
23 files changed, 535 insertions, 591 deletions
diff --git a/README.md b/README.md
index 4fbada6..bdbd3ac 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,7 @@ Again, go to my website below, my email address is available, go ahead and send
- [Plugins.PluginBase](lib/Plugins.PluginBase/#) - Base library/api for plugin developers to build fully managed/supported runtime loaded plugins, without worrying about the plumbing, such as the IPlugin api, endpoint creation, and logging! This library is required if you wish to use most of the Plugin.Extensions libraries.
- [Net.Messaging.FBM](lib/Net.Messaging.FBM/#) - Fixed Buffer Messaging protocol, high performance, request/response architecture, client & server library, built atop http and web-sockets. As implied, relies on fixed sized internal buffers that are negotiated to transfer data with minimal overhead for known messaging architectures.
- [WinRpMalloc](lib/WinRpMalloc/#) - A Windows x64 dll project that exposes the rpmalloc memory allocator as a NativeHeap for .NET Utils library loading in the unmanned heap architecture.
+- [Net.Compression](lib/Net.Compression/#) - A cross platform IHttpCompressorManager configured for runtime dynamic loading for high performance native response data compression.
- [Net.Rest.Client](lib/Net.Rest.Client/#) - A minimal library that provides a RestSharp client resource pool for concurrent usage with async support, along with an OAuth2 client credentials IAuthenticator implementation for use with Oauth2 plugins.
## Builds & Source
diff --git a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
index 5c4f4fd..af77329 100644
--- a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
+++ b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
@@ -42,7 +42,6 @@ using System.Text.Json;
using System.Runtime.CompilerServices;
using VNLib.Net.Http;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
namespace VNLib.Net.Compression
@@ -50,21 +49,18 @@ namespace VNLib.Net.Compression
public sealed class CompressorManager : IHttpCompressorManager
{
const string NATIVE_LIB_NAME = "vnlib_compress.dll";
- const int MIN_BUF_SIZE_DEFAULT = 8192;
private LibraryWrapper? _nativeLib;
private CompressionLevel _compLevel;
- private int minOutBufferSize;
/// <summary>
/// Called by the VNLib.Webserver during startup to initiialize the compressor.
/// </summary>
/// <param name="log">The application log provider</param>
- /// <param name="configJsonString">The raw json configuration data</param>
+ /// <param name="config">The json configuration element</param>
public void OnLoad(ILogProvider? log, JsonElement? config)
{
_compLevel = CompressionLevel.Optimal;
- minOutBufferSize = MIN_BUF_SIZE_DEFAULT;
string libPath = NATIVE_LIB_NAME;
if(config.HasValue)
@@ -83,11 +79,6 @@ namespace VNLib.Net.Compression
{
libPath = libEl.GetString() ?? NATIVE_LIB_NAME;
}
-
- if(compEl.TryGetProperty("min_out_buf_size", out JsonElement minBufEl))
- {
- minOutBufferSize = minBufEl.GetInt32();
- }
}
}
@@ -128,7 +119,7 @@ namespace VNLib.Net.Compression
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
//Instance should be null during initialization calls
- Debug.Assert(compressor.Instance == IntPtr.Zero);
+ Debug.Assert(compressor.Instance == IntPtr.Zero, "Init was called but and old compressor instance was not properly freed");
//Alloc the compressor
compressor.Instance = _nativeLib!.AllocateCompressor(compMethod, _compLevel);
@@ -147,13 +138,6 @@ namespace VNLib.Net.Compression
throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor");
}
- //Free the output buffer
- if(compressor.OutputBuffer != null)
- {
- ArrayPool<byte>.Shared.Return(compressor.OutputBuffer, true);
- compressor.OutputBuffer = null;
- }
-
//Free compressor instance
_nativeLib!.FreeCompressor(compressor.Instance);
@@ -162,7 +146,7 @@ namespace VNLib.Net.Compression
}
///<inheritdoc/>
- public ReadOnlyMemory<byte> Flush(object compressorState)
+ public int Flush(object compressorState, Memory<byte> output)
{
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
@@ -171,17 +155,13 @@ namespace VNLib.Net.Compression
throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor");
}
- //rent a new buffer of the minimum size if not already allocated
- compressor.OutputBuffer ??= ArrayPool<byte>.Shared.Rent(minOutBufferSize);
-
//Force a flush until no more data is available
- int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, default, true);
-
- return compressor.OutputBuffer.AsMemory(0, bytesWritten);
+ CompressionResult result = CompressBlock(compressor.Instance, output, default, true);
+ return result.BytesWritten;
}
///<inheritdoc/>
- public ReadOnlyMemory<byte> CompressBlock(object compressorState, ReadOnlyMemory<byte> input, bool finalBlock)
+ public CompressionResult CompressBlock(object compressorState, ReadOnlyMemory<byte> input, Memory<byte> output)
{
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
@@ -196,30 +176,16 @@ namespace VNLib.Net.Compression
* as a reference for callers. If its too small it will just have to be flushed
*/
- //See if the compressor has a buffer allocated
- if (compressor.OutputBuffer == null)
- {
- //Determine the required buffer size
- int bufferSize = _nativeLib!.GetOutputSize(compressor.Instance, input.Length, finalBlock ? 1 : 0);
-
- //clamp the buffer size to the minimum output buffer size
- bufferSize = Math.Max(bufferSize, minOutBufferSize);
-
- //rent a new buffer
- compressor.OutputBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
- }
//Compress the block
- int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, input, finalBlock);
-
- return compressor.OutputBuffer.AsMemory(0, bytesWritten);
+ return CompressBlock(compressor.Instance, output, input, false);
}
- private unsafe int CompressBlock(IntPtr comp, byte[] output, ReadOnlyMemory<byte> input, bool finalBlock)
+ private unsafe CompressionResult CompressBlock(IntPtr comp, ReadOnlyMemory<byte> output, ReadOnlyMemory<byte> input, bool finalBlock)
{
//get pointers to the input and output buffers
using MemoryHandle inPtr = input.Pin();
- using MemoryHandle outPtr = MemoryUtil.PinArrayAndGetHandle(output, 0);
+ using MemoryHandle outPtr = output.Pin();
//Create the operation struct
CompressionOperation operation;
@@ -240,7 +206,11 @@ namespace VNLib.Net.Compression
_nativeLib!.CompressBlock(comp, &operation);
//Return the number of bytes written
- return op->bytesWritten;
+ return new()
+ {
+ BytesRead = op->bytesRead,
+ BytesWritten = op->bytesWritten
+ };
}
@@ -250,8 +220,6 @@ namespace VNLib.Net.Compression
private sealed class Compressor
{
public IntPtr Instance;
-
- public byte[]? OutputBuffer;
}
}
diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
index 2dea9d7..f77b3d2 100644
--- a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
+++ b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
@@ -8,6 +8,7 @@ using System.Security.Cryptography;
using VNLib.Utils.IO;
using VNLib.Net.Http;
+using VNLib.Utils.Extensions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -16,11 +17,11 @@ namespace VNLib.Net.Compression.Tests
[TestClass()]
public class CompressorManagerTests
{
- const string LIB_PATH = @"F:\Programming\VNLib\VNLib.Net.Compression\native\vnlib_compress\build\Debug\vnlib_compress.dll";
+ const string LIB_PATH = @"../../../../vnlib_compress/build/Debug/vnlib_compress.dll";
[TestMethod()]
- public void OnLoadTest()
+ public void CompressDataStreamTest()
{
CompressorManager manager = InitCompressorUnderTest();
@@ -141,32 +142,37 @@ namespace VNLib.Net.Compression.Tests
//Create a buffer to compress
byte[] buffer = new byte[4096];
+ byte[] output = new byte[4096];
//fill with random data
RandomNumberGenerator.Fill(buffer);
+ int read = 0;
+
//try to compress the data in chunks
- for(int i = 0; i < 4; i++)
+ while(read < buffer.Length)
{
//Get 4th of a buffer
- ReadOnlyMemory<byte> chunk = buffer.AsMemory(i * 1024, 1024);
+ ReadOnlyMemory<byte> chunk = buffer.AsMemory(read, 1024);
//Compress data
- ReadOnlyMemory<byte> output = manager.CompressBlock(compressor, chunk, i == 3);
+ CompressionResult result = manager.CompressBlock(compressor, chunk, output);
//Write the compressed data to the output stream
- outputStream.Write(output.Span);
+ outputStream.Write(output.Slice(0, result.BytesWritten));
+
+ //Increment the read position
+ read += result.BytesRead;
}
- //flush the compressor
- while(true)
- {
- ReadOnlyMemory<byte> output = manager.Flush(compressor);
- if(output.IsEmpty)
- {
- break;
- }
- outputStream.Write(output.Span);
+ //Flush
+ int flushed = 100;
+ while(flushed > 0)
+ {
+ flushed = manager.Flush(compressor, output);
+
+ //Write the compressed data to the output stream
+ outputStream.Write(output.AsSpan()[0..flushed]);
}
//Verify the data
diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
index e235ac5..adf9496 100644
--- a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
+++ b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
@@ -3,16 +3,18 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
-
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
- <PackageReference Include="MSTest.TestAdapter" Version="2.2.10" />
- <PackageReference Include="MSTest.TestFramework" Version="2.2.10" />
- <PackageReference Include="coverlet.collector" Version="3.2.0" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
+ <PackageReference Include="MSTest.TestAdapter" Version="3.1.1" />
+ <PackageReference Include="MSTest.TestFramework" Version="3.1.1" />
+ <PackageReference Include="coverlet.collector" Version="6.0.0">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
</ItemGroup>
<ItemGroup>
diff --git a/lib/Net.Http/src/Core/Compression/CompressionResult.cs b/lib/Net.Http/src/Core/Compression/CompressionResult.cs
new file mode 100644
index 0000000..aaeebc5
--- /dev/null
+++ b/lib/Net.Http/src/Core/Compression/CompressionResult.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: CompressionResult.cs
+*
+* CompressionResult.cs is part of VNLib.Net.Http which is part
+* of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+namespace VNLib.Net.Http
+{
+ /// <summary>
+ /// Represents the result of a block compression operation
+ /// </summary>
+ public readonly ref struct CompressionResult
+ {
+ /// <summary>
+ /// The number of bytes read from the input buffer
+ /// </summary>
+ public readonly int BytesRead { get; init; }
+
+ /// <summary>
+ /// The number of bytes availabe in the output buffer
+ /// </summary>
+ public readonly int BytesWritten { get; init; }
+ }
+}
diff --git a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
index 80763f8..6aadd49 100644
--- a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
+++ b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
@@ -26,6 +26,7 @@ using System;
namespace VNLib.Net.Http
{
+
/// <summary>
/// Represents an http compressor manager that creates compressor state instances and processes
/// compression operations.
@@ -54,29 +55,22 @@ namespace VNLib.Net.Http
/// <summary>
/// Compresses a block of data using the compressor state. The input block size is
- /// guarunteed to the the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/>
+ /// guarunteed to be smaller than the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/>
/// or smaller.
- /// <para>
- /// This method may be called with an empty input block to flush the compressor state.
- /// </para>
/// </summary>
/// <param name="compressorState">The compressor state instance</param>
/// <param name="input">The input buffer to compress</param>
- /// <param name="finalBlock">A value that indicates if this block is the final block</param>
- /// <returns>The compressed block to send to the client</returns>
- /// <remarks>
- /// The returned memory block should belong to the individual compressor state, and be valid until the
- /// call to deinit. The result of the block should remain valid
- /// until the next call to compress or deinit.
- /// </remarks>
- ReadOnlyMemory<byte> CompressBlock(object compressorState, ReadOnlyMemory<byte> input, bool finalBlock);
+ /// <param name="output">The output buffer to write the compressed data to</param>
+ /// <returns>The result of the stream operation</returns>
+ CompressionResult CompressBlock(object compressorState, ReadOnlyMemory<byte> input, Memory<byte> output);
/// <summary>
/// Flushes any stored compressor data that still needs to be sent to the client.
/// </summary>
/// <param name="compressorState">The compressor state instance</param>
- /// <returns>The remaining data stored in the compressor state, may be empty if no data is pending</returns>
- ReadOnlyMemory<byte> Flush(object compressorState);
+ /// <param name="output">The output buffer</param>
+ /// <returns>The number of bytes flushed to the output buffer</returns>
+ int Flush(object compressorState, Memory<byte> output);
/// <summary>
/// Initializes the compressor state for a compression operation
diff --git a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
index a68a838..034b282 100644
--- a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
+++ b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
@@ -23,8 +23,6 @@
*/
using System;
-using System.IO;
-using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Compression
{
@@ -49,21 +47,21 @@ namespace VNLib.Net.Http.Core.Compression
/// Initializes the compressor for a compression operation
/// </summary>
/// <param name="compMethod">The compression mode to use</param>
- /// <param name="output">The stream to write compressed data to</param>
- void Init(Stream output, CompressionMethod compMethod);
+ void Init(CompressionMethod compMethod);
/// <summary>
- /// Gets a value that indicates if the compressor requires more flushing to occur
+ /// Compresses a block of input data and writes the result to the output buffer
/// </summary>
- bool IsFlushRequired();
+ /// <param name="input">The input data to compress</param>
+ /// <param name="output">The output buffer to write compressed data to</param>
+ /// <returns>The result of the compression operation</returns>
+ CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output);
/// <summary>
- /// Compresses a block of data and writes it to the output stream. If an empty flush is
- /// commited, then the input buffer will be empty.
+ /// Writes any remaining data to the output buffer, flushing the compressor
/// </summary>
- /// <param name="buffer">The block of memory to write to compress</param>
- /// <param name="finalBlock">A value that indicates if this block is the final block</param>
- /// <returns>A task that represents the compression operation</returns>
- ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock);
+ /// <param name="output">The buffer to write output data to</param>
+ /// <returns>The number of bytes written to the output buffer</returns>
+ int Flush(Memory<byte> output);
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
index b3f971a..34e59ac 100644
--- a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
+++ b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
@@ -23,9 +23,6 @@
*/
using System;
-using System.IO;
-using System.Threading.Tasks;
-
namespace VNLib.Net.Http.Core.Compression
{
@@ -47,46 +44,39 @@ namespace VNLib.Net.Http.Core.Compression
*/
private object? _compressor;
- private Stream? _stream;
- private ReadOnlyMemory<byte> _lastFlush;
private bool initialized;
///<inheritdoc/>
public int BlockSize { get; private set; }
- public bool IsFlushRequired()
+ ///<inheritdoc/>
+ public CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output)
{
- //See if a flush is required
- _lastFlush = _provider.Flush(_compressor!);
- return _lastFlush.Length > 0;
+ //Compress the block
+ return _provider.CompressBlock(_compressor!, input, output);
}
///<inheritdoc/>
- public ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock)
+ public int Flush(Memory<byte> output)
{
- /*
- * If input buffer is empty and flush data is available,
- * write the last flush data to the stream
- */
- if(buffer.Length == 0 && _lastFlush.Length > 0)
- {
- return _stream!.WriteAsync(_lastFlush);
- }
+ return _provider.Flush(_compressor!, output);
+ }
- //Compress the block
- ReadOnlyMemory<byte> result = _provider.CompressBlock(_compressor!, buffer, finalBlock);
+ ///<inheritdoc/>
+ public void Init(CompressionMethod compMethod)
+ {
+ //Defer alloc the compressor
+ _compressor ??= _provider.AllocCompressor();
+
+ //Init the compressor and get the block size
+ BlockSize = _provider.InitCompressor(_compressor, compMethod);
- //Write the compressed block to the stream
- return _stream!.WriteAsync(result);
+ initialized = true;
}
///<inheritdoc/>
public void Free()
{
- //Remove stream ref and de-init the compressor
- _stream = null;
- _lastFlush = default;
-
//Deinit compressor if initialized
if (initialized)
{
@@ -94,18 +84,5 @@ namespace VNLib.Net.Http.Core.Compression
initialized = false;
}
}
-
- ///<inheritdoc/>
- public void Init(Stream output, CompressionMethod compMethod)
- {
- //Defer alloc the compressor
- _compressor ??= _provider.AllocCompressor();
-
- //Init the compressor and get the block size
- BlockSize = _provider.InitCompressor(_compressor, compMethod);
-
- _stream = output;
- initialized = true;
- }
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs
index f0f88d2..696e9da 100644
--- a/lib/Net.Http/src/Core/IHttpResponseBody.cs
+++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs
@@ -23,9 +23,9 @@
*/
using System;
-using System.IO;
using System.Threading.Tasks;
+using VNLib.Net.Http.Core.Response;
using VNLib.Net.Http.Core.Compression;
namespace VNLib.Net.Http.Core
@@ -63,28 +63,15 @@ namespace VNLib.Net.Http.Core
/// <param name="buffer">An optional buffer used to buffer responses</param>
/// <param name="count">The maximum length of the response data to write</param>
/// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(Stream dest, long count, Memory<byte> buffer);
+ Task WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer);
/// <summary>
/// Writes internal response entity data to the destination stream
/// </summary>
- /// <param name="dest">The response compressor</param>
+ /// <param name="comp">The response compressor</param>
+ /// <param name="writer">The response output writer</param>
/// <param name="buffer">An optional buffer used to buffer responses</param>
/// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer);
-
- /*
- * Added to the response writing hot-paths optimize calls when compression
- * is disabled and an explicit length is not required.
- */
-
- /// <summary>
- /// Writes internal response entity data to the destination stream
- /// without compression
- /// </summary>
- /// <param name="dest">The response stream to write data to</param>
- /// <param name="buffer">Optional buffer if required, used to buffer response data</param>
- /// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(Stream dest, Memory<byte> buffer);
+ Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer);
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Request/HttpInputStream.cs b/lib/Net.Http/src/Core/Request/HttpInputStream.cs
index dc903d2..fbb1d41 100644
--- a/lib/Net.Http/src/Core/Request/HttpInputStream.cs
+++ b/lib/Net.Http/src/Core/Request/HttpInputStream.cs
@@ -26,6 +26,7 @@ using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using System.Runtime.CompilerServices;
using VNLib.Utils;
using VNLib.Utils.Memory;
@@ -49,6 +50,8 @@ namespace VNLib.Net.Http.Core
public HttpInputStream(IHttpContextInformation contextInfo) => ContextInfo = contextInfo;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void OnComplete()
{
//Dispose the initial data buffer if set
diff --git a/lib/Net.Http/src/Core/Request/HttpRequest.cs b/lib/Net.Http/src/Core/Request/HttpRequest.cs
index 0668197..2cc99bd 100644
--- a/lib/Net.Http/src/Core/Request/HttpRequest.cs
+++ b/lib/Net.Http/src/Core/Request/HttpRequest.cs
@@ -26,6 +26,7 @@ using System;
using System.Net;
using System.Collections.Generic;
using System.Security.Authentication;
+using System.Runtime.CompilerServices;
using VNLib.Utils;
using VNLib.Utils.Memory;
@@ -97,6 +98,7 @@ namespace VNLib.Net.Http.Core
void IHttpLifeCycle.OnNewConnection()
{ }
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnNewRequest()
{
//Set to defaults
@@ -106,6 +108,7 @@ namespace VNLib.Net.Http.Core
HttpVersion = HttpVersion.None;
}
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//release the input stream
diff --git a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
index 4a88361..f87ec98 100644
--- a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
+++ b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
@@ -24,7 +24,6 @@
using System;
-using VNLib.Utils;
using VNLib.Utils.IO;
using VNLib.Net.Http.Core.Buffering;
@@ -57,66 +56,45 @@ namespace VNLib.Net.Http.Core.Response
*/
private int _reservedOffset;
-
///<inheritdoc/>
public int RemainingSize => Buffer.Size - AccumulatedSize;
///<inheritdoc/>
- public Span<byte> Remaining => Buffer.GetBinSpan()[AccumulatedSize..];
+ Span<byte> IDataAccumulator<byte>.Remaining => Buffer.GetBinSpan()[AccumulatedSize..];
///<inheritdoc/>
- public Span<byte> Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize];
+ Span<byte> IDataAccumulator<byte>.Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize];
///<inheritdoc/>
public int AccumulatedSize { get; set; }
- /*
- * Completed chunk is the segment of the buffer that contains the size segment
- * followed by the accumulated chunk data, and the trailing crlf.
- *
- * AccumulatedSize points to the end of the accumulated chunk data. The reserved
- * offset points to the start of the size segment.
- */
- private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize];
+ ///<inheritdoc/>
+ public void Advance(int count) => AccumulatedSize += count;
/// <summary>
- /// Attempts to buffer as much data as possible from the specified data
+ /// Gets the remaining segment of the buffer to write chunk data to.
/// </summary>
- /// <param name="data">The data to copy</param>
- /// <returns>The number of bytes that were buffered</returns>
- public ERRNO TryBufferChunk(ReadOnlySpan<byte> data)
+ /// <returns>The chunk buffer to write data to</returns>
+ public Memory<byte> GetRemainingSegment()
{
- //Calc data size and reserve space for final crlf
- int dataToCopy = Math.Min(data.Length, RemainingSize - Context.EncodedSegments.CrlfBytes.Length);
-
- //Write as much data as possible
- data[..dataToCopy].CopyTo(Remaining);
-
- //Advance buffer
- Advance(dataToCopy);
+ /*
+ * We need to return the remaining segment of the buffer, the segment after the
+ * accumulated chunk data, but before the trailing crlf.
+ */
- //Return number of bytes not written
- return dataToCopy;
+ //Get the remaining buffer segment
+ return Buffer.GetMemory().Slice(AccumulatedSize, RemainingSize - Context.EncodedSegments.CrlfBytes.Length);
}
- ///<inheritdoc/>
- public void Advance(int count) => AccumulatedSize += count;
-
- private void InitReserved()
+ /// <summary>
+ /// Calculates the usable remaining size of the chunk buffer.
+ /// </summary>
+ /// <returns>The number of bytes remaining in the buffer</returns>
+ public int GetRemainingSegmentSize()
{
- //First reserve the chunk window by advancing the accumulator to the reserved size
- Advance(ReservedSize);
- }
-
- ///<inheritdoc/>
- public void Reset()
- {
- //zero offsets
- _reservedOffset = 0;
- AccumulatedSize = 0;
- //Init reserved segment
- InitReserved();
+ //Remaining size accounting for the trailing crlf
+ return RemainingSize - Context.EncodedSegments.CrlfBytes.Length;
}
/// <summary>
@@ -156,6 +134,32 @@ namespace VNLib.Net.Http.Core.Response
/*
+ * Completed chunk is the segment of the buffer that contains the size segment
+ * followed by the accumulated chunk data, and the trailing crlf.
+ *
+ * AccumulatedSize points to the end of the accumulated chunk data. The reserved
+ * offset points to the start of the size segment.
+ */
+ private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize];
+
+
+ private void InitReserved()
+ {
+ //First reserve the chunk window by advancing the accumulator to the reserved size
+ Advance(ReservedSize);
+ }
+
+ ///<inheritdoc/>
+ public void Reset()
+ {
+ //zero offsets
+ _reservedOffset = 0;
+ AccumulatedSize = 0;
+ //Init reserved segment
+ InitReserved();
+ }
+
+ /*
* UpdateChunkSize method updates the running total of the chunk size
* in the reserved segment of the buffer. This is because http chunking
* requires hex encoded chunk sizes to be written as the first bytes of
diff --git a/lib/Net.Http/src/Core/Response/ChunkedStream.cs b/lib/Net.Http/src/Core/Response/ChunkedStream.cs
index 639e18f..267d8ed 100644
--- a/lib/Net.Http/src/Core/Response/ChunkedStream.cs
+++ b/lib/Net.Http/src/Core/Response/ChunkedStream.cs
@@ -26,192 +26,70 @@
* Provides a Chunked data-encoding stream for writing data-chunks to
* the transport using the basic chunked encoding format from MDN
* https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#directives
-*
-* This stream will buffer entire chunks to avoid multiple writes to the
-* transport which can block or at minium cause overhead in context switching
-* which should be mostly avoided but cause overhead in copying. Time profiling
-* showed nearly equivalent performance for small chunks for synchronous writes.
-*
*/
using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils;
-using VNLib.Utils.Memory;
using VNLib.Net.Http.Core.Buffering;
namespace VNLib.Net.Http.Core.Response
{
-#pragma warning disable CA2215 // Dispose methods should call base class dispose
-#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
-
/// <summary>
/// Writes chunked HTTP message bodies to an underlying streamwriter
/// </summary>
- internal sealed class ChunkedStream : ReusableResponseStream
+ internal sealed class ChunkedStream : ReusableResponseStream, IResponseDataWriter
{
private readonly ChunkDataAccumulator ChunckAccumulator;
-
- private bool HadError;
internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context)
{
//Init accumulator
ChunckAccumulator = new(buffer, context);
}
-
- public override void Write(ReadOnlySpan<byte> chunk)
- {
- //Only write non-zero chunks
- if (chunk.Length <= 0)
- {
- return;
- }
-
- //Init reader
- ForwardOnlyReader<byte> reader = new(chunk);
- try
- {
- do
- {
- //try to accumulate the chunk data
- ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window);
-
- //Not all data was buffered
- if (written < reader.WindowSize)
- {
- //Advance reader
- reader.Advance(written);
- //Flush accumulator
- Memory<byte> accChunk = ChunckAccumulator.GetChunkData();
-
- //Reset the chunk accumulator
- ChunckAccumulator.Reset();
-
- //Write chunk data
- transport!.Write(accChunk.Span);
-
- //Continue to buffer / flush as needed
- continue;
- }
- break;
- }
- while (true);
- }
- catch
- {
- HadError = true;
- throw;
- }
- }
+ #region Hooks
- public override async ValueTask WriteAsync(ReadOnlyMemory<byte> chunk, CancellationToken cancellationToken = default)
+ ///<inheritdoc/>
+ public void OnNewRequest()
{
- //Only write non-zero chunks
- if (chunk.Length <= 0)
- {
- return;
- }
-
- try
- {
- //Init reader
- ForwardOnlyMemoryReader<byte> reader = new(chunk);
-
- do
- {
- //try to accumulate the chunk data
- ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span);
-
- //Not all data was buffered
- if (written < reader.WindowSize)
- {
- //Advance reader
- reader.Advance(written);
-
- //Flush accumulator
- Memory<byte> accChunk = ChunckAccumulator.GetChunkData();
-
- //Reset the chunk accumulator
- ChunckAccumulator.Reset();
-
- //Flush accumulator async
- await transport!.WriteAsync(accChunk, cancellationToken);
-
- //Continue to buffer / flush as needed
- continue;
- }
-
- break;
- }
- while (true);
- }
- catch
- {
- HadError = true;
- throw;
- }
+ ChunckAccumulator.OnNewRequest();
}
- public override async ValueTask DisposeAsync()
+ ///<inheritdoc/>
+ public void OnComplete()
{
- //If write error occured, then do not write the last chunk
- if (HadError)
- {
- return;
- }
-
- //Complete the last chunk
- Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData();
-
- //Reset the accumulator
- ChunckAccumulator.Reset();
+ ChunckAccumulator.OnComplete();
+ }
- //Write remaining data to stream
- await transport!.WriteAsync(chunkData, CancellationToken.None);
+ ///<inheritdoc/>
+ public Memory<byte> GetMemory() => ChunckAccumulator.GetRemainingSegment();
- //Flush base stream
- await transport!.FlushAsync(CancellationToken.None);
+ ///<inheritdoc/>
+ public int Advance(int written)
+ {
+ //Advance the accumulator
+ ChunckAccumulator.Advance(written);
+ return ChunckAccumulator.GetRemainingSegmentSize();
}
- public override void Close()
+ ///<inheritdoc/>
+ public ValueTask FlushAsync(bool isFinal)
{
- //If write error occured, then do not write the last chunk
- if (HadError)
- {
- return;
- }
-
- //Complete the last chunk
- Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData();
+ /*
+ * We need to know when the final chunk is being flushed so we can
+ * write the final termination sequence to the transport.
+ */
+
+ Memory<byte> chunkData = isFinal ? ChunckAccumulator.GetFinalChunkData() : ChunckAccumulator.GetChunkData();
//Reset the accumulator
ChunckAccumulator.Reset();
- //Write chunk data
- transport!.Write(chunkData.Span);
-
- //Flush base stream
- transport!.Flush();
- }
-
- #region Hooks
-
- public void OnNewRequest()
- {
- ChunckAccumulator.OnNewRequest();
- }
-
- public void OnComplete()
- {
- ChunckAccumulator.OnComplete();
-
- //Clear error flag
- HadError = false;
+ //Write remaining data to stream
+ return transport!.WriteAsync(chunkData, CancellationToken.None);
}
#endregion
diff --git a/lib/Net.Http/src/Core/Response/DirectStream.cs b/lib/Net.Http/src/Core/Response/DirectStream.cs
index 7d0e568..2406f0f 100644
--- a/lib/Net.Http/src/Core/Response/DirectStream.cs
+++ b/lib/Net.Http/src/Core/Response/DirectStream.cs
@@ -23,28 +23,17 @@
*/
using System;
-using System.Threading;
using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Response
{
- internal sealed class DirectStream : ReusableResponseStream
- {
-
- public override void Write(ReadOnlySpan<byte> buffer)
- {
- transport!.Write(buffer);
- }
- public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
- {
- return transport!.WriteAsync(buffer, cancellationToken);
- }
-
- //Pass through flush methods
- public override void Flush() => transport!.Flush();
-
- public override Task FlushAsync(CancellationToken cancellationToken) => transport!.FlushAsync(cancellationToken);
+ internal sealed class DirectStream : ReusableResponseStream, IDirectResponsWriter
+ {
+ ///<inheritdoc/>
+ public Task FlushAsync() => transport!.FlushAsync();
+ ///<inheritdoc/>
+ public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer) => transport!.WriteAsync(buffer);
}
}
diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
index 0b29e43..ca5f040 100644
--- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
+++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
@@ -23,11 +23,12 @@
*/
using System;
-using System.IO;
using System.Net;
using System.Diagnostics;
using System.Threading.Tasks;
+using VNLib.Net.Http.Core.Response;
+
namespace VNLib.Net.Http.Core
{
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
@@ -161,49 +162,36 @@ namespace VNLib.Net.Http.Core
private async Task WriteEntityDataAsync(long length, CompressionMethod compMethod, bool hasExplicitLength)
{
- //Get output stream, and always dispose it
- await using Stream outputStream = await GetOutputStreamAsync(length, compMethod);
-
//Determine if buffer is required
Memory<byte> buffer = ResponseBody.BufferRequired ? Buffers.GetResponseDataBuffer() : Memory<byte>.Empty;
- /*
- * Using compression, we must initialize a compressor, and write the response
- * with the locked compressor
- */
- if (compMethod != CompressionMethod.None)
+ //We need to flush header before we can write to the transport
+ await Response.CompleteHeadersAsync(compMethod == CompressionMethod.None ? length : -1);
+
+ if (compMethod == CompressionMethod.None)
+ {
+ //Setup a direct stream to write to
+ IDirectResponsWriter output = Response.GetDirectStream();
+
+ //Write response with optional forced length
+ await ResponseBody.WriteEntityAsync(output, hasExplicitLength ? length : -1, buffer);
+ }
+ else
{
//Compressor must never be null at this point
Debug.Assert(_compressor != null, "Compression was allowed but the compressor was not initialized");
+ //Get the chunked response writer
+ IResponseDataWriter output = Response.GetChunkWriter();
+
//Init compressor (Deinint is deferred to the end of the request)
- _compressor.Init(outputStream, compMethod);
+ _compressor.Init(compMethod);
//Write response
- await ResponseBody.WriteEntityAsync(_compressor, buffer);
-
- }
- /*
- * Explicit length may be set when the response may have more data than requested
- * by the client. IE: when a range is set, we need to make sure we sent exactly the
- * correct data, otherwise the client will drop the connection.
- */
- else if(hasExplicitLength)
- {
- //Write response with explicit length
- await ResponseBody.WriteEntityAsync(outputStream, length, buffer);
-
- }
- else
- {
- await ResponseBody.WriteEntityAsync(outputStream, buffer);
+ await ResponseBody.WriteEntityAsync(_compressor, output, buffer);
}
}
- private ValueTask<Stream> GetOutputStreamAsync(long length, CompressionMethod compMethod)
- {
- return compMethod == CompressionMethod.None ? Response.GetStreamAsync(length) : Response.GetStreamAsync();
- }
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
diff --git a/lib/Net.Http/src/Core/Response/HttpResponse.cs b/lib/Net.Http/src/Core/Response/HttpResponse.cs
index c09c3f7..90f6f24 100644
--- a/lib/Net.Http/src/Core/Response/HttpResponse.cs
+++ b/lib/Net.Http/src/Core/Response/HttpResponse.cs
@@ -190,59 +190,57 @@ namespace VNLib.Net.Http.Core
}
/// <summary>
- /// Gets a stream for writing data of a specified length directly to the client
+ /// Flushes all available headers to the transport asynchronously
/// </summary>
- /// <param name="ContentLength"></param>
- /// <returns>A <see cref="Stream"/> configured for writing data to client</returns>
- /// <exception cref="OutOfMemoryException"></exception>
- /// <exception cref="InvalidOperationException"></exception>
- public ValueTask<Stream> GetStreamAsync(long ContentLength)
+ /// <param name="contentLength">The optional content length if set, <![CDATA[ < 0]]> for chunked responses</param>
+ /// <returns>A value task that completes when header data has been made available to the transport</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public ValueTask CompleteHeadersAsync(long contentLength)
{
Check();
- //Add content length header
- Headers[HttpResponseHeader.ContentLength] = ContentLength.ToString();
+ if (contentLength < 0)
+ {
+ //Add chunked header
+ Headers[HttpResponseHeader.TransferEncoding] = "chunked";
+
+ }
+ else
+ {
+ //Add content length header
+ Headers[HttpResponseHeader.ContentLength] = contentLength.ToString();
+ }
//Flush headers
- ValueTask flush = EndFlushHeadersAsync();
+ return EndFlushHeadersAsync();
+ }
- //Return the reusable stream
- return flush.IsCompletedSuccessfully ?
- ValueTask.FromResult<Stream>(ReusableDirectStream)
- : GetStreamAsyncCore(flush, ReusableDirectStream);
+ /// <summary>
+ /// Gets a response writer for writing directly to the transport stream
+ /// </summary>
+ /// <returns>The <see cref="IDirectResponsWriter"/> instance for writing stream data to</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IDirectResponsWriter GetDirectStream()
+ {
+ //Headers must be sent before getting a direct stream
+ Debug.Assert(HeadersSent);
+ return ReusableDirectStream;
}
/// <summary>
- /// Sets up the client for chuncked encoding and gets a stream that allows for chuncks to be sent. User must call dispose on stream when done writing data
+ /// Gets a response writer for writing chunked data to the transport stream
/// </summary>
- /// <returns><see cref="Stream"/> supporting chunked encoding</returns>
- /// <exception cref="OutOfMemoryException"></exception>
- /// <exception cref="InvalidOperationException"></exception>
- public ValueTask<Stream> GetStreamAsync()
+ /// <returns>The <see cref="IResponseDataWriter"/> for buffering response chunks</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IResponseDataWriter GetChunkWriter()
{
//Chunking is only an http 1.1 feature (should never get called otherwise)
- Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11);
-
- Check();
-
- //Set encoding type to chunked with user-defined compression
- Headers[HttpResponseHeader.TransferEncoding] = "chunked";
-
- //Flush headers
- ValueTask flush = EndFlushHeadersAsync();
+ Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11, "Chunked response handler was requested, but is not an HTTP/1.1 response");
+ Debug.Assert(HeadersSent, "Chunk write was requested but header data has not been sent");
- //Return the reusable stream
- return flush.IsCompletedSuccessfully ?
- ValueTask.FromResult<Stream>(ReusableChunkedStream)
- : GetStreamAsyncCore(flush, ReusableChunkedStream);
- }
-
- private static async ValueTask<Stream> GetStreamAsyncCore(ValueTask flush, Stream stream)
- {
- //Await the flush and get the stream
- await flush.ConfigureAwait(false);
- return stream;
+ return ReusableChunkedStream;
}
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void Check()
@@ -329,6 +327,7 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnNewRequest()
{
//Default to okay status code
@@ -338,6 +337,7 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//Clear headers and cookies
diff --git a/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs
new file mode 100644
index 0000000..7c9ca41
--- /dev/null
+++ b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: IDirectResponsWriter.cs
+*
+* IDirectResponsWriter.cs is part of VNLib.Net.Http which is part of
+* the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Http.Core.Response
+{
+ /// <summary>
+ /// Represents a stream that can be written to directly (does not
+ /// buffer response data)
+ /// </summary>
+ internal interface IDirectResponsWriter
+ {
+ /// <summary>
+ /// Writes the given data buffer to the client
+ /// </summary>
+ /// <param name="buffer">The response data to write</param>
+ /// <returns>A value task that resolves when the write operation is complete</returns>
+ ValueTask WriteAsync(ReadOnlyMemory<byte> buffer);
+
+ /// <summary>
+ /// Flushes any remaining data to the client
+ /// </summary>
+ /// <returns>A task that resolves when the flush operationis complete</returns>
+ Task FlushAsync();
+ }
+}
diff --git a/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs
new file mode 100644
index 0000000..55ef49c
--- /dev/null
+++ b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs
@@ -0,0 +1,56 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: IResponseDataWriter.cs
+*
+* IResponseDataWriter.cs is part of VNLib.Net.Http which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Http.Core.Response
+{
+ /// <summary>
+ /// A buffered http response data writer
+ /// </summary>
+ internal interface IResponseDataWriter
+ {
+ /// <summary>
+ /// Gets the next memory segment available to buffer data to
+ /// </summary>
+ /// <returns>An available buffer to write response data to </returns>
+ Memory<byte> GetMemory();
+
+ /// <summary>
+ /// Advances the writer by the number of bytes written and returns the
+ /// number of bytes available for writing on the next call to <see cref="GetMemory"/>
+ /// </summary>
+ /// <param name="written">The number of bytes written to the output buffer</param>
+ /// <returns>The number of bytes remaining in the internal buffer</returns>
+ int Advance(int written);
+
+ /// <summary>
+ /// Flushes the internal buffer to the underlying stream
+ /// </summary>
+ /// <param name="isFinal">A value that indicates that this is final call to flush</param>
+ /// <returns>A valuetask that completes </returns>
+ ValueTask FlushAsync(bool isFinal);
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
index 77dc619..d67fc01 100644
--- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs
+++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
@@ -22,12 +22,20 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+/*
+ * This file handles response entity processing. It handles in-memory response
+ * processing, as well as stream response processing. It handles contraints
+ * such as content-range limits. I tried to eliminate or reduce the amount of
+ * memory copying required to process the response entity.
+ */
+
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using System.Runtime.CompilerServices;
-using VNLib.Utils.Extensions;
+using VNLib.Net.Http.Core.Response;
using VNLib.Net.Http.Core.Compression;
namespace VNLib.Net.Http.Core
@@ -94,68 +102,95 @@ namespace VNLib.Net.Http.Core
ReadOnlyMemory<byte> _readSegment;
///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory<byte> buffer)
+ async Task IHttpResponseBody.WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer)
{
int remaining;
//Write a sliding window response
if (_memoryResponse != null)
{
- //Get min value from count/range length
- remaining = (int)Math.Min(count, _memoryResponse.Remaining);
-
- //Write response body from memory
- while (remaining > 0)
+ if(count > 0)
{
- //Get remaining segment
- _readSegment = _memoryResponse.GetRemainingConstrained(remaining);
-
- //Write segment to output stream
- await dest.WriteAsync(_readSegment);
-
- //Advance by the written ammount
- _memoryResponse.Advance(_readSegment.Length);
-
- //Update remaining
- remaining -= _readSegment.Length;
- }
- }
- else
- {
- //Buffer is required, and count must be supplied
- await _streamResponse!.CopyToAsync(dest, buffer, count);
+ //Get min value from count/range length
+ remaining = (int)Math.Min(count, _memoryResponse.Remaining);
- //Try to dispose the response stream asyncrhonously since we are done with it
- await _streamResponse!.DisposeAsync();
+ //Write response body from memory
+ while (remaining > 0)
+ {
+ //Get remaining segment
+ _readSegment = _memoryResponse.GetRemainingConstrained(remaining);
- //remove ref so its not disposed again
- _streamResponse = null;
- }
- }
+ //Write segment to output stream
+ await dest.WriteAsync(_readSegment);
- ///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory<byte> buffer)
- {
- //Write a sliding window response
- if (_memoryResponse != null)
- {
- //Write response body from memory
- while (_memoryResponse.Remaining > 0)
+ //Advance by the written ammount
+ _memoryResponse.Advance(_readSegment.Length);
+
+ //Update remaining
+ remaining -= _readSegment.Length;
+ }
+ }
+ else
{
- //Get remaining segment
- _readSegment = _memoryResponse.GetMemory();
+ //Write response body from memory
+ while (_memoryResponse.Remaining > 0)
+ {
+ //Get remaining segment
+ _readSegment = _memoryResponse.GetMemory();
- //Write segment to output stream
- await dest.WriteAsync(_readSegment);
+ //Write segment to output stream
+ await dest.WriteAsync(_readSegment);
- //Advance by the written ammount
- _memoryResponse.Advance(_readSegment.Length);
+ //Advance by the written amount
+ _memoryResponse.Advance(_readSegment.Length);
+ }
}
+
+ //Disposing of memory response can be deferred until the end of the request since its always syncrhonous
}
else
{
- //Buffer is required, and count must be supplied
- await _streamResponse!.CopyToAsync(dest, buffer);
+ if (count > 0)
+ {
+ //Buffer is required, and count must be supplied
+
+ long total = 0;
+ int read;
+ while (true)
+ {
+ //get offset wrapper of the total buffer or remaining count
+ Memory<byte> offset = buffer[..(int)Math.Min(buffer.Length, count - total)];
+ //read
+ read = await _streamResponse!.ReadAsync(offset);
+ //Guard
+ if (read == 0)
+ {
+ break;
+ }
+ //write only the data that was read (slice)
+ await dest.WriteAsync(offset[..read]);
+ //Update total
+ total += read;
+ }
+ }
+ else
+ {
+ //Read in loop
+ do
+ {
+ //read
+ int read = await _streamResponse!.ReadAsync(buffer);
+ //Guard
+ if (read == 0)
+ {
+ break;
+ }
+
+ //write only the data that was read (slice)
+ await dest.WriteAsync(buffer[..read]);
+
+ } while (true);
+ }
//Try to dispose the response stream asyncrhonously since we are done with it
await _streamResponse!.DisposeAsync();
@@ -166,64 +201,22 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer)
+ async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer)
{
//Locals
- bool remaining;
int read;
//Write a sliding window response
if (_memoryResponse != null)
- {
- /*
- * It is safe to assume if a response body was set, that it contains data.
- * So the cost or running a loop without data is not a concern.
- *
- * Since any failed writes to the output will raise exceptions, it is safe
- * to advance the reader before writing the data, so we can determine if the
- * block is final.
- *
- * Since we are using a byte-stream reader for memory responses, we can optimize the
- * compression loop, if we know its operating block size, so we only compress blocks
- * of the block size, then continue the loop without branching or causing nested
- * loops
- */
-
- //Optimize for block size
- if (dest.BlockSize > 0)
- {
- //Write response body from memory
- do
- {
- _readSegment = _memoryResponse.GetRemainingConstrained(dest.BlockSize);
-
- //Advance by the trimmed segment length
- _memoryResponse.Advance(_readSegment.Length);
-
- //Check if data is remaining after an advance
- remaining = _memoryResponse.Remaining > 0;
-
- //Compress the trimmed block
- await dest.CompressBlockAsync(_readSegment, !remaining);
-
- } while (remaining);
- }
- else
+ {
+ while (_memoryResponse.Remaining > 0)
{
- do
+ //Commit output bytes
+ if (CompressNextSegment(_memoryResponse, comp, writer))
{
- _readSegment = _memoryResponse.GetMemory();
-
- //Advance by the segment length, this should be safe even if its zero
- _memoryResponse.Advance(_readSegment.Length);
-
- //Check if data is remaining after an advance
- remaining = _memoryResponse.Remaining > 0;
-
- //Write to output
- await dest.CompressBlockAsync(_readSegment, !remaining);
-
- } while (remaining);
+ //Time to flush
+ await writer.FlushAsync(false);
+ }
}
//Disposing of memory response can be deferred until the end of the request since its always syncrhonous
@@ -231,9 +224,9 @@ namespace VNLib.Net.Http.Core
else
{
//Trim buffer to block size if it is set by the compressor
- if (dest.BlockSize > 0)
+ if (comp.BlockSize > 0)
{
- buffer = buffer[..dest.BlockSize];
+ buffer = buffer[..comp.BlockSize];
}
//Read in loop
@@ -248,8 +241,12 @@ namespace VNLib.Net.Http.Core
break;
}
- //write only the data that was read, as a segment instead of a block
- await dest.CompressBlockAsync(buffer[..read], read < buffer.Length);
+ //Compress the buffered data and flush if required
+ if(CompressNextSegment(buffer, comp, writer))
+ {
+ //Time to flush
+ await writer.FlushAsync(false);
+ }
} while (true);
@@ -263,17 +260,69 @@ namespace VNLib.Net.Http.Core
_streamResponse = null;
}
- //Continue flusing flushing the compressor if required
- while(dest.IsFlushRequired())
+
+ /*
+ * Once there is no more response data avialable to compress
+ * we need to flush the compressor, then flush the writer
+ * to publish all accumulated data to the client
+ */
+
+ do
{
- //Flush the compressor
- await dest.CompressBlockAsync(ReadOnlyMemory<byte>.Empty, true);
- }
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Flush the compressor output
+ int written = comp.Flush(output);
+
+ //No more data to buffer
+ if (written == 0)
+ {
+ //final flush and exit
+ await writer.FlushAsync(true);
+ break;
+ }
+
+ if (writer.Advance(written) == 0)
+ {
+ //Flush because accumulator is full
+ await writer.FlushAsync(false);
+ }
+
+ } while (true);
+ }
+
+ private static bool CompressNextSegment(IMemoryResponseReader reader, IResponseCompressor comp, IResponseDataWriter writer)
+ {
+ //Read the next segment
+ ReadOnlyMemory<byte> readSegment = comp.BlockSize > 0 ? reader.GetRemainingConstrained(comp.BlockSize) : reader.GetMemory();
+
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Compress the trimmed block
+ CompressionResult res = comp.CompressBlock(readSegment, output);
+
+ //Commit input bytes
+ reader.Advance(res.BytesRead);
+
+ return writer.Advance(res.BytesWritten) == 0;
+ }
+
+ private static bool CompressNextSegment(Memory<byte> readSegment, IResponseCompressor comp, IResponseDataWriter writer)
+ {
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Compress the trimmed block
+ CompressionResult res = comp.CompressBlock(readSegment, output);
+
+ return writer.Advance(res.BytesWritten) == 0;
}
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
-
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//Clear has data flag
diff --git a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
index 0be4821..60465f9 100644
--- a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
+++ b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
@@ -22,18 +22,12 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Response
{
-#pragma warning disable CA2215 // Dispose methods should call base class dispose
-#pragma warning disable CA1844 // Provide memory-based overrides of async methods when subclassing 'Stream'
-
- internal abstract class ReusableResponseStream : Stream
+ internal abstract class ReusableResponseStream
{
protected Stream? transport;
@@ -46,50 +40,7 @@ namespace VNLib.Net.Http.Core.Response
/// <summary>
/// Called when the connection is released
/// </summary>
- public virtual void OnRelease() => this.transport = null;
-
-
- //Block base dispose
- protected override void Dispose(bool disposing)
- { }
-
- //Block base close
- public override void Close()
- { }
-
- //block base dispose async
- public override ValueTask DisposeAsync()
- {
- return ValueTask.CompletedTask;
- }
-
- //Block flush
- public override void Flush()
- { }
-
- //Block flush async
- public override Task FlushAsync(CancellationToken cancellationToken)
- {
- return Task.CompletedTask;
- }
-
- //Block stream basics
- public override bool CanRead => false;
- public override bool CanSeek => false;
- public override bool CanWrite => true;
- public override long Length => throw new NotSupportedException();
- public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
-
- //Reading is not enabled
- public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from");
- public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking");
- public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking");
-
- public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count));
-
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
- }
+ public virtual void OnRelease() => transport = null;
+
}
} \ No newline at end of file
diff --git a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
index 616f907..d53cc7d 100644
--- a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
+++ b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
@@ -41,7 +41,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="Serilog" Version="2.12.0" />
+ <PackageReference Include="Serilog" Version="3.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.0" />
</ItemGroup>
diff --git a/lib/Utils/src/Extensions/IoExtensions.cs b/lib/Utils/src/Extensions/IoExtensions.cs
index 637cfab..bcbe810 100644
--- a/lib/Utils/src/Extensions/IoExtensions.cs
+++ b/lib/Utils/src/Extensions/IoExtensions.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -328,12 +328,11 @@ namespace VNLib.Utils.Extensions
* bytes from the source
*/
long total = 0;
- int bufferSize = buffer.Length;
int read;
while (true)
{
//get offset wrapper of the total buffer or remaining count
- Memory<byte> offset = buffer[..(int)Math.Min(bufferSize, count - total)];
+ Memory<byte> offset = buffer[..(int)Math.Min(buffer.Length, count - total)];
//read
read = await source.ReadAsync(offset, token);
//Guard
diff --git a/lib/Utils/tests/VNLib.UtilsTests.csproj b/lib/Utils/tests/VNLib.UtilsTests.csproj
index 5651bb3..c29915d 100644
--- a/lib/Utils/tests/VNLib.UtilsTests.csproj
+++ b/lib/Utils/tests/VNLib.UtilsTests.csproj
@@ -16,9 +16,9 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
- <PackageReference Include="MSTest.TestAdapter" Version="3.0.2" />
- <PackageReference Include="MSTest.TestFramework" Version="3.0.2" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
+ <PackageReference Include="MSTest.TestAdapter" Version="3.1.1" />
+ <PackageReference Include="MSTest.TestFramework" Version="3.1.1" />
<PackageReference Include="coverlet.collector" Version="6.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>