aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-28 11:59:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-28 11:59:28 -0400
commitf0c2337358fc43ad9c79294c539c4ddec4280011 (patch)
tree3125d7fece381c0ab86012bc0b1c848fee0170eb /lib
parentcd7d9f3ec65737b96c713b1b84ef972e901c4ea3 (diff)
Compressor control refactor, reduce copy
Diffstat (limited to 'lib')
-rw-r--r--lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs60
-rw-r--r--lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs36
-rw-r--r--lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj12
-rw-r--r--lib/Net.Http/src/Core/Compression/CompressionResult.cs42
-rw-r--r--lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs22
-rw-r--r--lib/Net.Http/src/Core/Compression/IResponseCompressor.cs22
-rw-r--r--lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs55
-rw-r--r--lib/Net.Http/src/Core/IHttpResponseBody.cs23
-rw-r--r--lib/Net.Http/src/Core/Request/HttpInputStream.cs3
-rw-r--r--lib/Net.Http/src/Core/Request/HttpRequest.cs3
-rw-r--r--lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs88
-rw-r--r--lib/Net.Http/src/Core/Response/ChunkedStream.cs176
-rw-r--r--lib/Net.Http/src/Core/Response/DirectStream.cs23
-rw-r--r--lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs50
-rw-r--r--lib/Net.Http/src/Core/Response/HttpResponse.cs76
-rw-r--r--lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs49
-rw-r--r--lib/Net.Http/src/Core/Response/IResponseDataWriter.cs56
-rw-r--r--lib/Net.Http/src/Core/Response/ResponseWriter.cs261
-rw-r--r--lib/Net.Http/src/Core/Response/ReusableResponseStream.cs55
-rw-r--r--lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj2
-rw-r--r--lib/Utils/src/Extensions/IoExtensions.cs5
-rw-r--r--lib/Utils/tests/VNLib.UtilsTests.csproj6
22 files changed, 534 insertions, 591 deletions
diff --git a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
index 5c4f4fd..af77329 100644
--- a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
+++ b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs
@@ -42,7 +42,6 @@ using System.Text.Json;
using System.Runtime.CompilerServices;
using VNLib.Net.Http;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
namespace VNLib.Net.Compression
@@ -50,21 +49,18 @@ namespace VNLib.Net.Compression
public sealed class CompressorManager : IHttpCompressorManager
{
const string NATIVE_LIB_NAME = "vnlib_compress.dll";
- const int MIN_BUF_SIZE_DEFAULT = 8192;
private LibraryWrapper? _nativeLib;
private CompressionLevel _compLevel;
- private int minOutBufferSize;
/// <summary>
/// Called by the VNLib.Webserver during startup to initiialize the compressor.
/// </summary>
/// <param name="log">The application log provider</param>
- /// <param name="configJsonString">The raw json configuration data</param>
+ /// <param name="config">The json configuration element</param>
public void OnLoad(ILogProvider? log, JsonElement? config)
{
_compLevel = CompressionLevel.Optimal;
- minOutBufferSize = MIN_BUF_SIZE_DEFAULT;
string libPath = NATIVE_LIB_NAME;
if(config.HasValue)
@@ -83,11 +79,6 @@ namespace VNLib.Net.Compression
{
libPath = libEl.GetString() ?? NATIVE_LIB_NAME;
}
-
- if(compEl.TryGetProperty("min_out_buf_size", out JsonElement minBufEl))
- {
- minOutBufferSize = minBufEl.GetInt32();
- }
}
}
@@ -128,7 +119,7 @@ namespace VNLib.Net.Compression
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
//Instance should be null during initialization calls
- Debug.Assert(compressor.Instance == IntPtr.Zero);
+ Debug.Assert(compressor.Instance == IntPtr.Zero, "Init was called but and old compressor instance was not properly freed");
//Alloc the compressor
compressor.Instance = _nativeLib!.AllocateCompressor(compMethod, _compLevel);
@@ -147,13 +138,6 @@ namespace VNLib.Net.Compression
throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor");
}
- //Free the output buffer
- if(compressor.OutputBuffer != null)
- {
- ArrayPool<byte>.Shared.Return(compressor.OutputBuffer, true);
- compressor.OutputBuffer = null;
- }
-
//Free compressor instance
_nativeLib!.FreeCompressor(compressor.Instance);
@@ -162,7 +146,7 @@ namespace VNLib.Net.Compression
}
///<inheritdoc/>
- public ReadOnlyMemory<byte> Flush(object compressorState)
+ public int Flush(object compressorState, Memory<byte> output)
{
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
@@ -171,17 +155,13 @@ namespace VNLib.Net.Compression
throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor");
}
- //rent a new buffer of the minimum size if not already allocated
- compressor.OutputBuffer ??= ArrayPool<byte>.Shared.Rent(minOutBufferSize);
-
//Force a flush until no more data is available
- int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, default, true);
-
- return compressor.OutputBuffer.AsMemory(0, bytesWritten);
+ CompressionResult result = CompressBlock(compressor.Instance, output, default, true);
+ return result.BytesWritten;
}
///<inheritdoc/>
- public ReadOnlyMemory<byte> CompressBlock(object compressorState, ReadOnlyMemory<byte> input, bool finalBlock)
+ public CompressionResult CompressBlock(object compressorState, ReadOnlyMemory<byte> input, Memory<byte> output)
{
Compressor compressor = Unsafe.As<Compressor>(compressorState) ?? throw new ArgumentNullException(nameof(compressorState));
@@ -196,30 +176,16 @@ namespace VNLib.Net.Compression
* as a reference for callers. If its too small it will just have to be flushed
*/
- //See if the compressor has a buffer allocated
- if (compressor.OutputBuffer == null)
- {
- //Determine the required buffer size
- int bufferSize = _nativeLib!.GetOutputSize(compressor.Instance, input.Length, finalBlock ? 1 : 0);
-
- //clamp the buffer size to the minimum output buffer size
- bufferSize = Math.Max(bufferSize, minOutBufferSize);
-
- //rent a new buffer
- compressor.OutputBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
- }
//Compress the block
- int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, input, finalBlock);
-
- return compressor.OutputBuffer.AsMemory(0, bytesWritten);
+ return CompressBlock(compressor.Instance, output, input, false);
}
- private unsafe int CompressBlock(IntPtr comp, byte[] output, ReadOnlyMemory<byte> input, bool finalBlock)
+ private unsafe CompressionResult CompressBlock(IntPtr comp, ReadOnlyMemory<byte> output, ReadOnlyMemory<byte> input, bool finalBlock)
{
//get pointers to the input and output buffers
using MemoryHandle inPtr = input.Pin();
- using MemoryHandle outPtr = MemoryUtil.PinArrayAndGetHandle(output, 0);
+ using MemoryHandle outPtr = output.Pin();
//Create the operation struct
CompressionOperation operation;
@@ -240,7 +206,11 @@ namespace VNLib.Net.Compression
_nativeLib!.CompressBlock(comp, &operation);
//Return the number of bytes written
- return op->bytesWritten;
+ return new()
+ {
+ BytesRead = op->bytesRead,
+ BytesWritten = op->bytesWritten
+ };
}
@@ -250,8 +220,6 @@ namespace VNLib.Net.Compression
private sealed class Compressor
{
public IntPtr Instance;
-
- public byte[]? OutputBuffer;
}
}
diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
index 2dea9d7..f77b3d2 100644
--- a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
+++ b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs
@@ -8,6 +8,7 @@ using System.Security.Cryptography;
using VNLib.Utils.IO;
using VNLib.Net.Http;
+using VNLib.Utils.Extensions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -16,11 +17,11 @@ namespace VNLib.Net.Compression.Tests
[TestClass()]
public class CompressorManagerTests
{
- const string LIB_PATH = @"F:\Programming\VNLib\VNLib.Net.Compression\native\vnlib_compress\build\Debug\vnlib_compress.dll";
+ const string LIB_PATH = @"../../../../vnlib_compress/build/Debug/vnlib_compress.dll";
[TestMethod()]
- public void OnLoadTest()
+ public void CompressDataStreamTest()
{
CompressorManager manager = InitCompressorUnderTest();
@@ -141,32 +142,37 @@ namespace VNLib.Net.Compression.Tests
//Create a buffer to compress
byte[] buffer = new byte[4096];
+ byte[] output = new byte[4096];
//fill with random data
RandomNumberGenerator.Fill(buffer);
+ int read = 0;
+
//try to compress the data in chunks
- for(int i = 0; i < 4; i++)
+ while(read < buffer.Length)
{
//Get 4th of a buffer
- ReadOnlyMemory<byte> chunk = buffer.AsMemory(i * 1024, 1024);
+ ReadOnlyMemory<byte> chunk = buffer.AsMemory(read, 1024);
//Compress data
- ReadOnlyMemory<byte> output = manager.CompressBlock(compressor, chunk, i == 3);
+ CompressionResult result = manager.CompressBlock(compressor, chunk, output);
//Write the compressed data to the output stream
- outputStream.Write(output.Span);
+ outputStream.Write(output.Slice(0, result.BytesWritten));
+
+ //Increment the read position
+ read += result.BytesRead;
}
- //flush the compressor
- while(true)
- {
- ReadOnlyMemory<byte> output = manager.Flush(compressor);
- if(output.IsEmpty)
- {
- break;
- }
- outputStream.Write(output.Span);
+ //Flush
+ int flushed = 100;
+ while(flushed > 0)
+ {
+ flushed = manager.Flush(compressor, output);
+
+ //Write the compressed data to the output stream
+ outputStream.Write(output.AsSpan()[0..flushed]);
}
//Verify the data
diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
index e235ac5..adf9496 100644
--- a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
+++ b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj
@@ -3,16 +3,18 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
-
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
- <PackageReference Include="MSTest.TestAdapter" Version="2.2.10" />
- <PackageReference Include="MSTest.TestFramework" Version="2.2.10" />
- <PackageReference Include="coverlet.collector" Version="3.2.0" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
+ <PackageReference Include="MSTest.TestAdapter" Version="3.1.1" />
+ <PackageReference Include="MSTest.TestFramework" Version="3.1.1" />
+ <PackageReference Include="coverlet.collector" Version="6.0.0">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
</ItemGroup>
<ItemGroup>
diff --git a/lib/Net.Http/src/Core/Compression/CompressionResult.cs b/lib/Net.Http/src/Core/Compression/CompressionResult.cs
new file mode 100644
index 0000000..aaeebc5
--- /dev/null
+++ b/lib/Net.Http/src/Core/Compression/CompressionResult.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: CompressionResult.cs
+*
+* CompressionResult.cs is part of VNLib.Net.Http which is part
+* of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+namespace VNLib.Net.Http
+{
+ /// <summary>
+ /// Represents the result of a block compression operation
+ /// </summary>
+ public readonly ref struct CompressionResult
+ {
+ /// <summary>
+ /// The number of bytes read from the input buffer
+ /// </summary>
+ public readonly int BytesRead { get; init; }
+
+ /// <summary>
+ /// The number of bytes availabe in the output buffer
+ /// </summary>
+ public readonly int BytesWritten { get; init; }
+ }
+}
diff --git a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
index 80763f8..6aadd49 100644
--- a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
+++ b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs
@@ -26,6 +26,7 @@ using System;
namespace VNLib.Net.Http
{
+
/// <summary>
/// Represents an http compressor manager that creates compressor state instances and processes
/// compression operations.
@@ -54,29 +55,22 @@ namespace VNLib.Net.Http
/// <summary>
/// Compresses a block of data using the compressor state. The input block size is
- /// guarunteed to the the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/>
+ /// guarunteed to be smaller than the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/>
/// or smaller.
- /// <para>
- /// This method may be called with an empty input block to flush the compressor state.
- /// </para>
/// </summary>
/// <param name="compressorState">The compressor state instance</param>
/// <param name="input">The input buffer to compress</param>
- /// <param name="finalBlock">A value that indicates if this block is the final block</param>
- /// <returns>The compressed block to send to the client</returns>
- /// <remarks>
- /// The returned memory block should belong to the individual compressor state, and be valid until the
- /// call to deinit. The result of the block should remain valid
- /// until the next call to compress or deinit.
- /// </remarks>
- ReadOnlyMemory<byte> CompressBlock(object compressorState, ReadOnlyMemory<byte> input, bool finalBlock);
+ /// <param name="output">The output buffer to write the compressed data to</param>
+ /// <returns>The result of the stream operation</returns>
+ CompressionResult CompressBlock(object compressorState, ReadOnlyMemory<byte> input, Memory<byte> output);
/// <summary>
/// Flushes any stored compressor data that still needs to be sent to the client.
/// </summary>
/// <param name="compressorState">The compressor state instance</param>
- /// <returns>The remaining data stored in the compressor state, may be empty if no data is pending</returns>
- ReadOnlyMemory<byte> Flush(object compressorState);
+ /// <param name="output">The output buffer</param>
+ /// <returns>The number of bytes flushed to the output buffer</returns>
+ int Flush(object compressorState, Memory<byte> output);
/// <summary>
/// Initializes the compressor state for a compression operation
diff --git a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
index a68a838..034b282 100644
--- a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
+++ b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs
@@ -23,8 +23,6 @@
*/
using System;
-using System.IO;
-using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Compression
{
@@ -49,21 +47,21 @@ namespace VNLib.Net.Http.Core.Compression
/// Initializes the compressor for a compression operation
/// </summary>
/// <param name="compMethod">The compression mode to use</param>
- /// <param name="output">The stream to write compressed data to</param>
- void Init(Stream output, CompressionMethod compMethod);
+ void Init(CompressionMethod compMethod);
/// <summary>
- /// Gets a value that indicates if the compressor requires more flushing to occur
+ /// Compresses a block of input data and writes the result to the output buffer
/// </summary>
- bool IsFlushRequired();
+ /// <param name="input">The input data to compress</param>
+ /// <param name="output">The output buffer to write compressed data to</param>
+ /// <returns>The result of the compression operation</returns>
+ CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output);
/// <summary>
- /// Compresses a block of data and writes it to the output stream. If an empty flush is
- /// commited, then the input buffer will be empty.
+ /// Writes any remaining data to the output buffer, flushing the compressor
/// </summary>
- /// <param name="buffer">The block of memory to write to compress</param>
- /// <param name="finalBlock">A value that indicates if this block is the final block</param>
- /// <returns>A task that represents the compression operation</returns>
- ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock);
+ /// <param name="output">The buffer to write output data to</param>
+ /// <returns>The number of bytes written to the output buffer</returns>
+ int Flush(Memory<byte> output);
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
index b3f971a..34e59ac 100644
--- a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
+++ b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs
@@ -23,9 +23,6 @@
*/
using System;
-using System.IO;
-using System.Threading.Tasks;
-
namespace VNLib.Net.Http.Core.Compression
{
@@ -47,46 +44,39 @@ namespace VNLib.Net.Http.Core.Compression
*/
private object? _compressor;
- private Stream? _stream;
- private ReadOnlyMemory<byte> _lastFlush;
private bool initialized;
///<inheritdoc/>
public int BlockSize { get; private set; }
- public bool IsFlushRequired()
+ ///<inheritdoc/>
+ public CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output)
{
- //See if a flush is required
- _lastFlush = _provider.Flush(_compressor!);
- return _lastFlush.Length > 0;
+ //Compress the block
+ return _provider.CompressBlock(_compressor!, input, output);
}
///<inheritdoc/>
- public ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock)
+ public int Flush(Memory<byte> output)
{
- /*
- * If input buffer is empty and flush data is available,
- * write the last flush data to the stream
- */
- if(buffer.Length == 0 && _lastFlush.Length > 0)
- {
- return _stream!.WriteAsync(_lastFlush);
- }
+ return _provider.Flush(_compressor!, output);
+ }
- //Compress the block
- ReadOnlyMemory<byte> result = _provider.CompressBlock(_compressor!, buffer, finalBlock);
+ ///<inheritdoc/>
+ public void Init(CompressionMethod compMethod)
+ {
+ //Defer alloc the compressor
+ _compressor ??= _provider.AllocCompressor();
+
+ //Init the compressor and get the block size
+ BlockSize = _provider.InitCompressor(_compressor, compMethod);
- //Write the compressed block to the stream
- return _stream!.WriteAsync(result);
+ initialized = true;
}
///<inheritdoc/>
public void Free()
{
- //Remove stream ref and de-init the compressor
- _stream = null;
- _lastFlush = default;
-
//Deinit compressor if initialized
if (initialized)
{
@@ -94,18 +84,5 @@ namespace VNLib.Net.Http.Core.Compression
initialized = false;
}
}
-
- ///<inheritdoc/>
- public void Init(Stream output, CompressionMethod compMethod)
- {
- //Defer alloc the compressor
- _compressor ??= _provider.AllocCompressor();
-
- //Init the compressor and get the block size
- BlockSize = _provider.InitCompressor(_compressor, compMethod);
-
- _stream = output;
- initialized = true;
- }
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs
index f0f88d2..696e9da 100644
--- a/lib/Net.Http/src/Core/IHttpResponseBody.cs
+++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs
@@ -23,9 +23,9 @@
*/
using System;
-using System.IO;
using System.Threading.Tasks;
+using VNLib.Net.Http.Core.Response;
using VNLib.Net.Http.Core.Compression;
namespace VNLib.Net.Http.Core
@@ -63,28 +63,15 @@ namespace VNLib.Net.Http.Core
/// <param name="buffer">An optional buffer used to buffer responses</param>
/// <param name="count">The maximum length of the response data to write</param>
/// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(Stream dest, long count, Memory<byte> buffer);
+ Task WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer);
/// <summary>
/// Writes internal response entity data to the destination stream
/// </summary>
- /// <param name="dest">The response compressor</param>
+ /// <param name="comp">The response compressor</param>
+ /// <param name="writer">The response output writer</param>
/// <param name="buffer">An optional buffer used to buffer responses</param>
/// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer);
-
- /*
- * Added to the response writing hot-paths optimize calls when compression
- * is disabled and an explicit length is not required.
- */
-
- /// <summary>
- /// Writes internal response entity data to the destination stream
- /// without compression
- /// </summary>
- /// <param name="dest">The response stream to write data to</param>
- /// <param name="buffer">Optional buffer if required, used to buffer response data</param>
- /// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(Stream dest, Memory<byte> buffer);
+ Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer);
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Request/HttpInputStream.cs b/lib/Net.Http/src/Core/Request/HttpInputStream.cs
index dc903d2..fbb1d41 100644
--- a/lib/Net.Http/src/Core/Request/HttpInputStream.cs
+++ b/lib/Net.Http/src/Core/Request/HttpInputStream.cs
@@ -26,6 +26,7 @@ using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using System.Runtime.CompilerServices;
using VNLib.Utils;
using VNLib.Utils.Memory;
@@ -49,6 +50,8 @@ namespace VNLib.Net.Http.Core
public HttpInputStream(IHttpContextInformation contextInfo) => ContextInfo = contextInfo;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void OnComplete()
{
//Dispose the initial data buffer if set
diff --git a/lib/Net.Http/src/Core/Request/HttpRequest.cs b/lib/Net.Http/src/Core/Request/HttpRequest.cs
index 0668197..2cc99bd 100644
--- a/lib/Net.Http/src/Core/Request/HttpRequest.cs
+++ b/lib/Net.Http/src/Core/Request/HttpRequest.cs
@@ -26,6 +26,7 @@ using System;
using System.Net;
using System.Collections.Generic;
using System.Security.Authentication;
+using System.Runtime.CompilerServices;
using VNLib.Utils;
using VNLib.Utils.Memory;
@@ -97,6 +98,7 @@ namespace VNLib.Net.Http.Core
void IHttpLifeCycle.OnNewConnection()
{ }
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnNewRequest()
{
//Set to defaults
@@ -106,6 +108,7 @@ namespace VNLib.Net.Http.Core
HttpVersion = HttpVersion.None;
}
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//release the input stream
diff --git a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
index 4a88361..f87ec98 100644
--- a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
+++ b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs
@@ -24,7 +24,6 @@
using System;
-using VNLib.Utils;
using VNLib.Utils.IO;
using VNLib.Net.Http.Core.Buffering;
@@ -57,66 +56,45 @@ namespace VNLib.Net.Http.Core.Response
*/
private int _reservedOffset;
-
///<inheritdoc/>
public int RemainingSize => Buffer.Size - AccumulatedSize;
///<inheritdoc/>
- public Span<byte> Remaining => Buffer.GetBinSpan()[AccumulatedSize..];
+ Span<byte> IDataAccumulator<byte>.Remaining => Buffer.GetBinSpan()[AccumulatedSize..];
///<inheritdoc/>
- public Span<byte> Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize];
+ Span<byte> IDataAccumulator<byte>.Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize];
///<inheritdoc/>
public int AccumulatedSize { get; set; }
- /*
- * Completed chunk is the segment of the buffer that contains the size segment
- * followed by the accumulated chunk data, and the trailing crlf.
- *
- * AccumulatedSize points to the end of the accumulated chunk data. The reserved
- * offset points to the start of the size segment.
- */
- private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize];
+ ///<inheritdoc/>
+ public void Advance(int count) => AccumulatedSize += count;
/// <summary>
- /// Attempts to buffer as much data as possible from the specified data
+ /// Gets the remaining segment of the buffer to write chunk data to.
/// </summary>
- /// <param name="data">The data to copy</param>
- /// <returns>The number of bytes that were buffered</returns>
- public ERRNO TryBufferChunk(ReadOnlySpan<byte> data)
+ /// <returns>The chunk buffer to write data to</returns>
+ public Memory<byte> GetRemainingSegment()
{
- //Calc data size and reserve space for final crlf
- int dataToCopy = Math.Min(data.Length, RemainingSize - Context.EncodedSegments.CrlfBytes.Length);
-
- //Write as much data as possible
- data[..dataToCopy].CopyTo(Remaining);
-
- //Advance buffer
- Advance(dataToCopy);
+ /*
+ * We need to return the remaining segment of the buffer, the segment after the
+ * accumulated chunk data, but before the trailing crlf.
+ */
- //Return number of bytes not written
- return dataToCopy;
+ //Get the remaining buffer segment
+ return Buffer.GetMemory().Slice(AccumulatedSize, RemainingSize - Context.EncodedSegments.CrlfBytes.Length);
}
- ///<inheritdoc/>
- public void Advance(int count) => AccumulatedSize += count;
-
- private void InitReserved()
+ /// <summary>
+ /// Calculates the usable remaining size of the chunk buffer.
+ /// </summary>
+ /// <returns>The number of bytes remaining in the buffer</returns>
+ public int GetRemainingSegmentSize()
{
- //First reserve the chunk window by advancing the accumulator to the reserved size
- Advance(ReservedSize);
- }
-
- ///<inheritdoc/>
- public void Reset()
- {
- //zero offsets
- _reservedOffset = 0;
- AccumulatedSize = 0;
- //Init reserved segment
- InitReserved();
+ //Remaining size accounting for the trailing crlf
+ return RemainingSize - Context.EncodedSegments.CrlfBytes.Length;
}
/// <summary>
@@ -156,6 +134,32 @@ namespace VNLib.Net.Http.Core.Response
/*
+ * Completed chunk is the segment of the buffer that contains the size segment
+ * followed by the accumulated chunk data, and the trailing crlf.
+ *
+ * AccumulatedSize points to the end of the accumulated chunk data. The reserved
+ * offset points to the start of the size segment.
+ */
+ private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize];
+
+
+ private void InitReserved()
+ {
+ //First reserve the chunk window by advancing the accumulator to the reserved size
+ Advance(ReservedSize);
+ }
+
+ ///<inheritdoc/>
+ public void Reset()
+ {
+ //zero offsets
+ _reservedOffset = 0;
+ AccumulatedSize = 0;
+ //Init reserved segment
+ InitReserved();
+ }
+
+ /*
* UpdateChunkSize method updates the running total of the chunk size
* in the reserved segment of the buffer. This is because http chunking
* requires hex encoded chunk sizes to be written as the first bytes of
diff --git a/lib/Net.Http/src/Core/Response/ChunkedStream.cs b/lib/Net.Http/src/Core/Response/ChunkedStream.cs
index 639e18f..267d8ed 100644
--- a/lib/Net.Http/src/Core/Response/ChunkedStream.cs
+++ b/lib/Net.Http/src/Core/Response/ChunkedStream.cs
@@ -26,192 +26,70 @@
* Provides a Chunked data-encoding stream for writing data-chunks to
* the transport using the basic chunked encoding format from MDN
* https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#directives
-*
-* This stream will buffer entire chunks to avoid multiple writes to the
-* transport which can block or at minium cause overhead in context switching
-* which should be mostly avoided but cause overhead in copying. Time profiling
-* showed nearly equivalent performance for small chunks for synchronous writes.
-*
*/
using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils;
-using VNLib.Utils.Memory;
using VNLib.Net.Http.Core.Buffering;
namespace VNLib.Net.Http.Core.Response
{
-#pragma warning disable CA2215 // Dispose methods should call base class dispose
-#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
-
/// <summary>
/// Writes chunked HTTP message bodies to an underlying streamwriter
/// </summary>
- internal sealed class ChunkedStream : ReusableResponseStream
+ internal sealed class ChunkedStream : ReusableResponseStream, IResponseDataWriter
{
private readonly ChunkDataAccumulator ChunckAccumulator;
-
- private bool HadError;
internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context)
{
//Init accumulator
ChunckAccumulator = new(buffer, context);
}
-
- public override void Write(ReadOnlySpan<byte> chunk)
- {
- //Only write non-zero chunks
- if (chunk.Length <= 0)
- {
- return;
- }
-
- //Init reader
- ForwardOnlyReader<byte> reader = new(chunk);
- try
- {
- do
- {
- //try to accumulate the chunk data
- ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window);
-
- //Not all data was buffered
- if (written < reader.WindowSize)
- {
- //Advance reader
- reader.Advance(written);
- //Flush accumulator
- Memory<byte> accChunk = ChunckAccumulator.GetChunkData();
-
- //Reset the chunk accumulator
- ChunckAccumulator.Reset();
-
- //Write chunk data
- transport!.Write(accChunk.Span);
-
- //Continue to buffer / flush as needed
- continue;
- }
- break;
- }
- while (true);
- }
- catch
- {
- HadError = true;
- throw;
- }
- }
+ #region Hooks
- public override async ValueTask WriteAsync(ReadOnlyMemory<byte> chunk, CancellationToken cancellationToken = default)
+ ///<inheritdoc/>
+ public void OnNewRequest()
{
- //Only write non-zero chunks
- if (chunk.Length <= 0)
- {
- return;
- }
-
- try
- {
- //Init reader
- ForwardOnlyMemoryReader<byte> reader = new(chunk);
-
- do
- {
- //try to accumulate the chunk data
- ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span);
-
- //Not all data was buffered
- if (written < reader.WindowSize)
- {
- //Advance reader
- reader.Advance(written);
-
- //Flush accumulator
- Memory<byte> accChunk = ChunckAccumulator.GetChunkData();
-
- //Reset the chunk accumulator
- ChunckAccumulator.Reset();
-
- //Flush accumulator async
- await transport!.WriteAsync(accChunk, cancellationToken);
-
- //Continue to buffer / flush as needed
- continue;
- }
-
- break;
- }
- while (true);
- }
- catch
- {
- HadError = true;
- throw;
- }
+ ChunckAccumulator.OnNewRequest();
}
- public override async ValueTask DisposeAsync()
+ ///<inheritdoc/>
+ public void OnComplete()
{
- //If write error occured, then do not write the last chunk
- if (HadError)
- {
- return;
- }
-
- //Complete the last chunk
- Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData();
-
- //Reset the accumulator
- ChunckAccumulator.Reset();
+ ChunckAccumulator.OnComplete();
+ }
- //Write remaining data to stream
- await transport!.WriteAsync(chunkData, CancellationToken.None);
+ ///<inheritdoc/>
+ public Memory<byte> GetMemory() => ChunckAccumulator.GetRemainingSegment();
- //Flush base stream
- await transport!.FlushAsync(CancellationToken.None);
+ ///<inheritdoc/>
+ public int Advance(int written)
+ {
+ //Advance the accumulator
+ ChunckAccumulator.Advance(written);
+ return ChunckAccumulator.GetRemainingSegmentSize();
}
- public override void Close()
+ ///<inheritdoc/>
+ public ValueTask FlushAsync(bool isFinal)
{
- //If write error occured, then do not write the last chunk
- if (HadError)
- {
- return;
- }
-
- //Complete the last chunk
- Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData();
+ /*
+ * We need to know when the final chunk is being flushed so we can
+ * write the final termination sequence to the transport.
+ */
+
+ Memory<byte> chunkData = isFinal ? ChunckAccumulator.GetFinalChunkData() : ChunckAccumulator.GetChunkData();
//Reset the accumulator
ChunckAccumulator.Reset();
- //Write chunk data
- transport!.Write(chunkData.Span);
-
- //Flush base stream
- transport!.Flush();
- }
-
- #region Hooks
-
- public void OnNewRequest()
- {
- ChunckAccumulator.OnNewRequest();
- }
-
- public void OnComplete()
- {
- ChunckAccumulator.OnComplete();
-
- //Clear error flag
- HadError = false;
+ //Write remaining data to stream
+ return transport!.WriteAsync(chunkData, CancellationToken.None);
}
#endregion
diff --git a/lib/Net.Http/src/Core/Response/DirectStream.cs b/lib/Net.Http/src/Core/Response/DirectStream.cs
index 7d0e568..2406f0f 100644
--- a/lib/Net.Http/src/Core/Response/DirectStream.cs
+++ b/lib/Net.Http/src/Core/Response/DirectStream.cs
@@ -23,28 +23,17 @@
*/
using System;
-using System.Threading;
using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Response
{
- internal sealed class DirectStream : ReusableResponseStream
- {
-
- public override void Write(ReadOnlySpan<byte> buffer)
- {
- transport!.Write(buffer);
- }
- public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
- {
- return transport!.WriteAsync(buffer, cancellationToken);
- }
-
- //Pass through flush methods
- public override void Flush() => transport!.Flush();
-
- public override Task FlushAsync(CancellationToken cancellationToken) => transport!.FlushAsync(cancellationToken);
+ internal sealed class DirectStream : ReusableResponseStream, IDirectResponsWriter
+ {
+ ///<inheritdoc/>
+ public Task FlushAsync() => transport!.FlushAsync();
+ ///<inheritdoc/>
+ public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer) => transport!.WriteAsync(buffer);
}
}
diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
index 0b29e43..ca5f040 100644
--- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
+++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
@@ -23,11 +23,12 @@
*/
using System;
-using System.IO;
using System.Net;
using System.Diagnostics;
using System.Threading.Tasks;
+using VNLib.Net.Http.Core.Response;
+
namespace VNLib.Net.Http.Core
{
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
@@ -161,49 +162,36 @@ namespace VNLib.Net.Http.Core
private async Task WriteEntityDataAsync(long length, CompressionMethod compMethod, bool hasExplicitLength)
{
- //Get output stream, and always dispose it
- await using Stream outputStream = await GetOutputStreamAsync(length, compMethod);
-
//Determine if buffer is required
Memory<byte> buffer = ResponseBody.BufferRequired ? Buffers.GetResponseDataBuffer() : Memory<byte>.Empty;
- /*
- * Using compression, we must initialize a compressor, and write the response
- * with the locked compressor
- */
- if (compMethod != CompressionMethod.None)
+ //We need to flush header before we can write to the transport
+ await Response.CompleteHeadersAsync(compMethod == CompressionMethod.None ? length : -1);
+
+ if (compMethod == CompressionMethod.None)
+ {
+ //Setup a direct stream to write to
+ IDirectResponsWriter output = Response.GetDirectStream();
+
+ //Write response with optional forced length
+ await ResponseBody.WriteEntityAsync(output, hasExplicitLength ? length : -1, buffer);
+ }
+ else
{
//Compressor must never be null at this point
Debug.Assert(_compressor != null, "Compression was allowed but the compressor was not initialized");
+ //Get the chunked response writer
+ IResponseDataWriter output = Response.GetChunkWriter();
+
//Init compressor (Deinint is deferred to the end of the request)
- _compressor.Init(outputStream, compMethod);
+ _compressor.Init(compMethod);
//Write response
- await ResponseBody.WriteEntityAsync(_compressor, buffer);
-
- }
- /*
- * Explicit length may be set when the response may have more data than requested
- * by the client. IE: when a range is set, we need to make sure we sent exactly the
- * correct data, otherwise the client will drop the connection.
- */
- else if(hasExplicitLength)
- {
- //Write response with explicit length
- await ResponseBody.WriteEntityAsync(outputStream, length, buffer);
-
- }
- else
- {
- await ResponseBody.WriteEntityAsync(outputStream, buffer);
+ await ResponseBody.WriteEntityAsync(_compressor, output, buffer);
}
}
- private ValueTask<Stream> GetOutputStreamAsync(long length, CompressionMethod compMethod)
- {
- return compMethod == CompressionMethod.None ? Response.GetStreamAsync(length) : Response.GetStreamAsync();
- }
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
diff --git a/lib/Net.Http/src/Core/Response/HttpResponse.cs b/lib/Net.Http/src/Core/Response/HttpResponse.cs
index c09c3f7..90f6f24 100644
--- a/lib/Net.Http/src/Core/Response/HttpResponse.cs
+++ b/lib/Net.Http/src/Core/Response/HttpResponse.cs
@@ -190,59 +190,57 @@ namespace VNLib.Net.Http.Core
}
/// <summary>
- /// Gets a stream for writing data of a specified length directly to the client
+ /// Flushes all available headers to the transport asynchronously
/// </summary>
- /// <param name="ContentLength"></param>
- /// <returns>A <see cref="Stream"/> configured for writing data to client</returns>
- /// <exception cref="OutOfMemoryException"></exception>
- /// <exception cref="InvalidOperationException"></exception>
- public ValueTask<Stream> GetStreamAsync(long ContentLength)
+ /// <param name="contentLength">The optional content length if set, <![CDATA[ < 0]]> for chunked responses</param>
+ /// <returns>A value task that completes when header data has been made available to the transport</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public ValueTask CompleteHeadersAsync(long contentLength)
{
Check();
- //Add content length header
- Headers[HttpResponseHeader.ContentLength] = ContentLength.ToString();
+ if (contentLength < 0)
+ {
+ //Add chunked header
+ Headers[HttpResponseHeader.TransferEncoding] = "chunked";
+
+ }
+ else
+ {
+ //Add content length header
+ Headers[HttpResponseHeader.ContentLength] = contentLength.ToString();
+ }
//Flush headers
- ValueTask flush = EndFlushHeadersAsync();
+ return EndFlushHeadersAsync();
+ }
- //Return the reusable stream
- return flush.IsCompletedSuccessfully ?
- ValueTask.FromResult<Stream>(ReusableDirectStream)
- : GetStreamAsyncCore(flush, ReusableDirectStream);
+ /// <summary>
+ /// Gets a response writer for writing directly to the transport stream
+ /// </summary>
+ /// <returns>The <see cref="IDirectResponsWriter"/> instance for writing stream data to</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IDirectResponsWriter GetDirectStream()
+ {
+ //Headers must be sent before getting a direct stream
+ Debug.Assert(HeadersSent);
+ return ReusableDirectStream;
}
/// <summary>
- /// Sets up the client for chuncked encoding and gets a stream that allows for chuncks to be sent. User must call dispose on stream when done writing data
+ /// Gets a response writer for writing chunked data to the transport stream
/// </summary>
- /// <returns><see cref="Stream"/> supporting chunked encoding</returns>
- /// <exception cref="OutOfMemoryException"></exception>
- /// <exception cref="InvalidOperationException"></exception>
- public ValueTask<Stream> GetStreamAsync()
+ /// <returns>The <see cref="IResponseDataWriter"/> for buffering response chunks</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IResponseDataWriter GetChunkWriter()
{
//Chunking is only an http 1.1 feature (should never get called otherwise)
- Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11);
-
- Check();
-
- //Set encoding type to chunked with user-defined compression
- Headers[HttpResponseHeader.TransferEncoding] = "chunked";
-
- //Flush headers
- ValueTask flush = EndFlushHeadersAsync();
+ Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11, "Chunked response handler was requested, but is not an HTTP/1.1 response");
+ Debug.Assert(HeadersSent, "Chunk write was requested but header data has not been sent");
- //Return the reusable stream
- return flush.IsCompletedSuccessfully ?
- ValueTask.FromResult<Stream>(ReusableChunkedStream)
- : GetStreamAsyncCore(flush, ReusableChunkedStream);
- }
-
- private static async ValueTask<Stream> GetStreamAsyncCore(ValueTask flush, Stream stream)
- {
- //Await the flush and get the stream
- await flush.ConfigureAwait(false);
- return stream;
+ return ReusableChunkedStream;
}
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void Check()
@@ -329,6 +327,7 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnNewRequest()
{
//Default to okay status code
@@ -338,6 +337,7 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//Clear headers and cookies
diff --git a/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs
new file mode 100644
index 0000000..7c9ca41
--- /dev/null
+++ b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: IDirectResponsWriter.cs
+*
+* IDirectResponsWriter.cs is part of VNLib.Net.Http which is part of
+* the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Http.Core.Response
+{
+ /// <summary>
+ /// Represents a stream that can be written to directly (does not
+ /// buffer response data)
+ /// </summary>
+ internal interface IDirectResponsWriter
+ {
+ /// <summary>
+ /// Writes the given data buffer to the client
+ /// </summary>
+ /// <param name="buffer">The response data to write</param>
+ /// <returns>A value task that resolves when the write operation is complete</returns>
+ ValueTask WriteAsync(ReadOnlyMemory<byte> buffer);
+
+ /// <summary>
+ /// Flushes any remaining data to the client
+ /// </summary>
+ /// <returns>A task that resolves when the flush operationis complete</returns>
+ Task FlushAsync();
+ }
+}
diff --git a/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs
new file mode 100644
index 0000000..55ef49c
--- /dev/null
+++ b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs
@@ -0,0 +1,56 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Http
+* File: IResponseDataWriter.cs
+*
+* IResponseDataWriter.cs is part of VNLib.Net.Http which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Http is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Http is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Http.Core.Response
+{
+ /// <summary>
+ /// A buffered http response data writer
+ /// </summary>
+ internal interface IResponseDataWriter
+ {
+ /// <summary>
+ /// Gets the next memory segment available to buffer data to
+ /// </summary>
+ /// <returns>An available buffer to write response data to </returns>
+ Memory<byte> GetMemory();
+
+ /// <summary>
+ /// Advances the writer by the number of bytes written and returns the
+ /// number of bytes available for writing on the next call to <see cref="GetMemory"/>
+ /// </summary>
+ /// <param name="written">The number of bytes written to the output buffer</param>
+ /// <returns>The number of bytes remaining in the internal buffer</returns>
+ int Advance(int written);
+
+ /// <summary>
+ /// Flushes the internal buffer to the underlying stream
+ /// </summary>
+ /// <param name="isFinal">A value that indicates that this is final call to flush</param>
+ /// <returns>A valuetask that completes </returns>
+ ValueTask FlushAsync(bool isFinal);
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
index 77dc619..d67fc01 100644
--- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs
+++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
@@ -22,12 +22,20 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+/*
+ * This file handles response entity processing. It handles in-memory response
+ * processing, as well as stream response processing. It handles contraints
+ * such as content-range limits. I tried to eliminate or reduce the amount of
+ * memory copying required to process the response entity.
+ */
+
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using System.Runtime.CompilerServices;
-using VNLib.Utils.Extensions;
+using VNLib.Net.Http.Core.Response;
using VNLib.Net.Http.Core.Compression;
namespace VNLib.Net.Http.Core
@@ -94,68 +102,95 @@ namespace VNLib.Net.Http.Core
ReadOnlyMemory<byte> _readSegment;
///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory<byte> buffer)
+ async Task IHttpResponseBody.WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer)
{
int remaining;
//Write a sliding window response
if (_memoryResponse != null)
{
- //Get min value from count/range length
- remaining = (int)Math.Min(count, _memoryResponse.Remaining);
-
- //Write response body from memory
- while (remaining > 0)
+ if(count > 0)
{
- //Get remaining segment
- _readSegment = _memoryResponse.GetRemainingConstrained(remaining);
-
- //Write segment to output stream
- await dest.WriteAsync(_readSegment);
-
- //Advance by the written ammount
- _memoryResponse.Advance(_readSegment.Length);
-
- //Update remaining
- remaining -= _readSegment.Length;
- }
- }
- else
- {
- //Buffer is required, and count must be supplied
- await _streamResponse!.CopyToAsync(dest, buffer, count);
+ //Get min value from count/range length
+ remaining = (int)Math.Min(count, _memoryResponse.Remaining);
- //Try to dispose the response stream asyncrhonously since we are done with it
- await _streamResponse!.DisposeAsync();
+ //Write response body from memory
+ while (remaining > 0)
+ {
+ //Get remaining segment
+ _readSegment = _memoryResponse.GetRemainingConstrained(remaining);
- //remove ref so its not disposed again
- _streamResponse = null;
- }
- }
+ //Write segment to output stream
+ await dest.WriteAsync(_readSegment);
- ///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory<byte> buffer)
- {
- //Write a sliding window response
- if (_memoryResponse != null)
- {
- //Write response body from memory
- while (_memoryResponse.Remaining > 0)
+ //Advance by the written ammount
+ _memoryResponse.Advance(_readSegment.Length);
+
+ //Update remaining
+ remaining -= _readSegment.Length;
+ }
+ }
+ else
{
- //Get remaining segment
- _readSegment = _memoryResponse.GetMemory();
+ //Write response body from memory
+ while (_memoryResponse.Remaining > 0)
+ {
+ //Get remaining segment
+ _readSegment = _memoryResponse.GetMemory();
- //Write segment to output stream
- await dest.WriteAsync(_readSegment);
+ //Write segment to output stream
+ await dest.WriteAsync(_readSegment);
- //Advance by the written ammount
- _memoryResponse.Advance(_readSegment.Length);
+ //Advance by the written amount
+ _memoryResponse.Advance(_readSegment.Length);
+ }
}
+
+ //Disposing of memory response can be deferred until the end of the request since its always syncrhonous
}
else
{
- //Buffer is required, and count must be supplied
- await _streamResponse!.CopyToAsync(dest, buffer);
+ if (count > 0)
+ {
+ //Buffer is required, and count must be supplied
+
+ long total = 0;
+ int read;
+ while (true)
+ {
+ //get offset wrapper of the total buffer or remaining count
+ Memory<byte> offset = buffer[..(int)Math.Min(buffer.Length, count - total)];
+ //read
+ read = await _streamResponse!.ReadAsync(offset);
+ //Guard
+ if (read == 0)
+ {
+ break;
+ }
+ //write only the data that was read (slice)
+ await dest.WriteAsync(offset[..read]);
+ //Update total
+ total += read;
+ }
+ }
+ else
+ {
+ //Read in loop
+ do
+ {
+ //read
+ int read = await _streamResponse!.ReadAsync(buffer);
+ //Guard
+ if (read == 0)
+ {
+ break;
+ }
+
+ //write only the data that was read (slice)
+ await dest.WriteAsync(buffer[..read]);
+
+ } while (true);
+ }
//Try to dispose the response stream asyncrhonously since we are done with it
await _streamResponse!.DisposeAsync();
@@ -166,64 +201,22 @@ namespace VNLib.Net.Http.Core
}
///<inheritdoc/>
- async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer)
+ async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer)
{
//Locals
- bool remaining;
int read;
//Write a sliding window response
if (_memoryResponse != null)
- {
- /*
- * It is safe to assume if a response body was set, that it contains data.
- * So the cost or running a loop without data is not a concern.
- *
- * Since any failed writes to the output will raise exceptions, it is safe
- * to advance the reader before writing the data, so we can determine if the
- * block is final.
- *
- * Since we are using a byte-stream reader for memory responses, we can optimize the
- * compression loop, if we know its operating block size, so we only compress blocks
- * of the block size, then continue the loop without branching or causing nested
- * loops
- */
-
- //Optimize for block size
- if (dest.BlockSize > 0)
- {
- //Write response body from memory
- do
- {
- _readSegment = _memoryResponse.GetRemainingConstrained(dest.BlockSize);
-
- //Advance by the trimmed segment length
- _memoryResponse.Advance(_readSegment.Length);
-
- //Check if data is remaining after an advance
- remaining = _memoryResponse.Remaining > 0;
-
- //Compress the trimmed block
- await dest.CompressBlockAsync(_readSegment, !remaining);
-
- } while (remaining);
- }
- else
+ {
+ while (_memoryResponse.Remaining > 0)
{
- do
+ //Commit output bytes
+ if (CompressNextSegment(_memoryResponse, comp, writer))
{
- _readSegment = _memoryResponse.GetMemory();
-
- //Advance by the segment length, this should be safe even if its zero
- _memoryResponse.Advance(_readSegment.Length);
-
- //Check if data is remaining after an advance
- remaining = _memoryResponse.Remaining > 0;
-
- //Write to output
- await dest.CompressBlockAsync(_readSegment, !remaining);
-
- } while (remaining);
+ //Time to flush
+ await writer.FlushAsync(false);
+ }
}
//Disposing of memory response can be deferred until the end of the request since its always syncrhonous
@@ -231,9 +224,9 @@ namespace VNLib.Net.Http.Core
else
{
//Trim buffer to block size if it is set by the compressor
- if (dest.BlockSize > 0)
+ if (comp.BlockSize > 0)
{
- buffer = buffer[..dest.BlockSize];
+ buffer = buffer[..comp.BlockSize];
}
//Read in loop
@@ -248,8 +241,12 @@ namespace VNLib.Net.Http.Core
break;
}
- //write only the data that was read, as a segment instead of a block
- await dest.CompressBlockAsync(buffer[..read], read < buffer.Length);
+ //Compress the buffered data and flush if required
+ if(CompressNextSegment(buffer, comp, writer))
+ {
+ //Time to flush
+ await writer.FlushAsync(false);
+ }
} while (true);
@@ -263,17 +260,69 @@ namespace VNLib.Net.Http.Core
_streamResponse = null;
}
- //Continue flusing flushing the compressor if required
- while(dest.IsFlushRequired())
+
+ /*
+ * Once there is no more response data avialable to compress
+ * we need to flush the compressor, then flush the writer
+ * to publish all accumulated data to the client
+ */
+
+ do
{
- //Flush the compressor
- await dest.CompressBlockAsync(ReadOnlyMemory<byte>.Empty, true);
- }
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Flush the compressor output
+ int written = comp.Flush(output);
+
+ //No more data to buffer
+ if (written == 0)
+ {
+ //final flush and exit
+ await writer.FlushAsync(true);
+ break;
+ }
+
+ if (writer.Advance(written) == 0)
+ {
+ //Flush because accumulator is full
+ await writer.FlushAsync(false);
+ }
+
+ } while (true);
+ }
+
+ private static bool CompressNextSegment(IMemoryResponseReader reader, IResponseCompressor comp, IResponseDataWriter writer)
+ {
+ //Read the next segment
+ ReadOnlyMemory<byte> readSegment = comp.BlockSize > 0 ? reader.GetRemainingConstrained(comp.BlockSize) : reader.GetMemory();
+
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Compress the trimmed block
+ CompressionResult res = comp.CompressBlock(readSegment, output);
+
+ //Commit input bytes
+ reader.Advance(res.BytesRead);
+
+ return writer.Advance(res.BytesWritten) == 0;
+ }
+
+ private static bool CompressNextSegment(Memory<byte> readSegment, IResponseCompressor comp, IResponseDataWriter writer)
+ {
+ //Get output buffer
+ Memory<byte> output = writer.GetMemory();
+
+ //Compress the trimmed block
+ CompressionResult res = comp.CompressBlock(readSegment, output);
+
+ return writer.Advance(res.BytesWritten) == 0;
}
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
-
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnComplete()
{
//Clear has data flag
diff --git a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
index 0be4821..60465f9 100644
--- a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
+++ b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs
@@ -22,18 +22,12 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
namespace VNLib.Net.Http.Core.Response
{
-#pragma warning disable CA2215 // Dispose methods should call base class dispose
-#pragma warning disable CA1844 // Provide memory-based overrides of async methods when subclassing 'Stream'
-
- internal abstract class ReusableResponseStream : Stream
+ internal abstract class ReusableResponseStream
{
protected Stream? transport;
@@ -46,50 +40,7 @@ namespace VNLib.Net.Http.Core.Response
/// <summary>
/// Called when the connection is released
/// </summary>
- public virtual void OnRelease() => this.transport = null;
-
-
- //Block base dispose
- protected override void Dispose(bool disposing)
- { }
-
- //Block base close
- public override void Close()
- { }
-
- //block base dispose async
- public override ValueTask DisposeAsync()
- {
- return ValueTask.CompletedTask;
- }
-
- //Block flush
- public override void Flush()
- { }
-
- //Block flush async
- public override Task FlushAsync(CancellationToken cancellationToken)
- {
- return Task.CompletedTask;
- }
-
- //Block stream basics
- public override bool CanRead => false;
- public override bool CanSeek => false;
- public override bool CanWrite => true;
- public override long Length => throw new NotSupportedException();
- public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
-
- //Reading is not enabled
- public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from");
- public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking");
- public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking");
-
- public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count));
-
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
- }
+ public virtual void OnRelease() => transport = null;
+
}
} \ No newline at end of file
diff --git a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
index 616f907..d53cc7d 100644
--- a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
+++ b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj
@@ -41,7 +41,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="Serilog" Version="2.12.0" />
+ <PackageReference Include="Serilog" Version="3.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.0" />
</ItemGroup>
diff --git a/lib/Utils/src/Extensions/IoExtensions.cs b/lib/Utils/src/Extensions/IoExtensions.cs
index 637cfab..bcbe810 100644
--- a/lib/Utils/src/Extensions/IoExtensions.cs
+++ b/lib/Utils/src/Extensions/IoExtensions.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -328,12 +328,11 @@ namespace VNLib.Utils.Extensions
* bytes from the source
*/
long total = 0;
- int bufferSize = buffer.Length;
int read;
while (true)
{
//get offset wrapper of the total buffer or remaining count
- Memory<byte> offset = buffer[..(int)Math.Min(bufferSize, count - total)];
+ Memory<byte> offset = buffer[..(int)Math.Min(buffer.Length, count - total)];
//read
read = await source.ReadAsync(offset, token);
//Guard
diff --git a/lib/Utils/tests/VNLib.UtilsTests.csproj b/lib/Utils/tests/VNLib.UtilsTests.csproj
index 5651bb3..c29915d 100644
--- a/lib/Utils/tests/VNLib.UtilsTests.csproj
+++ b/lib/Utils/tests/VNLib.UtilsTests.csproj
@@ -16,9 +16,9 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
- <PackageReference Include="MSTest.TestAdapter" Version="3.0.2" />
- <PackageReference Include="MSTest.TestFramework" Version="3.0.2" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
+ <PackageReference Include="MSTest.TestAdapter" Version="3.1.1" />
+ <PackageReference Include="MSTest.TestFramework" Version="3.1.1" />
<PackageReference Include="coverlet.collector" Version="6.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>