From f0c2337358fc43ad9c79294c539c4ddec4280011 Mon Sep 17 00:00:00 2001 From: vnugent Date: Fri, 28 Jul 2023 11:59:28 -0400 Subject: Compressor control refactor, reduce copy --- .../VNLib.Net.Compression/CompressorManager.cs | 60 ++--- .../CompressorManagerTests.cs | 36 +-- .../VNLib.Net.CompressionTests.csproj | 12 +- .../src/Core/Compression/CompressionResult.cs | 42 ++++ .../src/Core/Compression/IHttpCompressorManager.cs | 22 +- .../src/Core/Compression/IResponseCompressor.cs | 22 +- .../src/Core/Compression/ManagedHttpCompressor.cs | 55 ++--- lib/Net.Http/src/Core/IHttpResponseBody.cs | 23 +- lib/Net.Http/src/Core/Request/HttpInputStream.cs | 3 + lib/Net.Http/src/Core/Request/HttpRequest.cs | 3 + .../src/Core/Response/ChunkDataAccumulator.cs | 88 +++---- lib/Net.Http/src/Core/Response/ChunkedStream.cs | 176 +++----------- lib/Net.Http/src/Core/Response/DirectStream.cs | 23 +- .../Core/Response/HttpContextResponseWriting.cs | 50 ++-- lib/Net.Http/src/Core/Response/HttpResponse.cs | 76 +++--- .../src/Core/Response/IDirectResponsWriter.cs | 49 ++++ .../src/Core/Response/IResponseDataWriter.cs | 56 +++++ lib/Net.Http/src/Core/Response/ResponseWriter.cs | 261 ++++++++++++--------- .../src/Core/Response/ReusableResponseStream.cs | 55 +---- .../src/VNLib.Plugins.PluginBase.csproj | 2 +- lib/Utils/src/Extensions/IoExtensions.cs | 5 +- lib/Utils/tests/VNLib.UtilsTests.csproj | 6 +- 22 files changed, 534 insertions(+), 591 deletions(-) create mode 100644 lib/Net.Http/src/Core/Compression/CompressionResult.cs create mode 100644 lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs create mode 100644 lib/Net.Http/src/Core/Response/IResponseDataWriter.cs (limited to 'lib') diff --git a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs index 5c4f4fd..af77329 100644 --- a/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs +++ b/lib/Net.Compression/VNLib.Net.Compression/CompressorManager.cs @@ -42,7 +42,6 @@ using System.Text.Json; using System.Runtime.CompilerServices; using VNLib.Net.Http; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; namespace VNLib.Net.Compression @@ -50,21 +49,18 @@ namespace VNLib.Net.Compression public sealed class CompressorManager : IHttpCompressorManager { const string NATIVE_LIB_NAME = "vnlib_compress.dll"; - const int MIN_BUF_SIZE_DEFAULT = 8192; private LibraryWrapper? _nativeLib; private CompressionLevel _compLevel; - private int minOutBufferSize; /// /// Called by the VNLib.Webserver during startup to initiialize the compressor. /// /// The application log provider - /// The raw json configuration data + /// The json configuration element public void OnLoad(ILogProvider? log, JsonElement? config) { _compLevel = CompressionLevel.Optimal; - minOutBufferSize = MIN_BUF_SIZE_DEFAULT; string libPath = NATIVE_LIB_NAME; if(config.HasValue) @@ -83,11 +79,6 @@ namespace VNLib.Net.Compression { libPath = libEl.GetString() ?? NATIVE_LIB_NAME; } - - if(compEl.TryGetProperty("min_out_buf_size", out JsonElement minBufEl)) - { - minOutBufferSize = minBufEl.GetInt32(); - } } } @@ -128,7 +119,7 @@ namespace VNLib.Net.Compression Compressor compressor = Unsafe.As(compressorState) ?? throw new ArgumentNullException(nameof(compressorState)); //Instance should be null during initialization calls - Debug.Assert(compressor.Instance == IntPtr.Zero); + Debug.Assert(compressor.Instance == IntPtr.Zero, "Init was called but and old compressor instance was not properly freed"); //Alloc the compressor compressor.Instance = _nativeLib!.AllocateCompressor(compMethod, _compLevel); @@ -147,13 +138,6 @@ namespace VNLib.Net.Compression throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor"); } - //Free the output buffer - if(compressor.OutputBuffer != null) - { - ArrayPool.Shared.Return(compressor.OutputBuffer, true); - compressor.OutputBuffer = null; - } - //Free compressor instance _nativeLib!.FreeCompressor(compressor.Instance); @@ -162,7 +146,7 @@ namespace VNLib.Net.Compression } /// - public ReadOnlyMemory Flush(object compressorState) + public int Flush(object compressorState, Memory output) { Compressor compressor = Unsafe.As(compressorState) ?? throw new ArgumentNullException(nameof(compressorState)); @@ -171,17 +155,13 @@ namespace VNLib.Net.Compression throw new InvalidOperationException("This compressor instance has not been initialized, cannot free compressor"); } - //rent a new buffer of the minimum size if not already allocated - compressor.OutputBuffer ??= ArrayPool.Shared.Rent(minOutBufferSize); - //Force a flush until no more data is available - int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, default, true); - - return compressor.OutputBuffer.AsMemory(0, bytesWritten); + CompressionResult result = CompressBlock(compressor.Instance, output, default, true); + return result.BytesWritten; } /// - public ReadOnlyMemory CompressBlock(object compressorState, ReadOnlyMemory input, bool finalBlock) + public CompressionResult CompressBlock(object compressorState, ReadOnlyMemory input, Memory output) { Compressor compressor = Unsafe.As(compressorState) ?? throw new ArgumentNullException(nameof(compressorState)); @@ -196,30 +176,16 @@ namespace VNLib.Net.Compression * as a reference for callers. If its too small it will just have to be flushed */ - //See if the compressor has a buffer allocated - if (compressor.OutputBuffer == null) - { - //Determine the required buffer size - int bufferSize = _nativeLib!.GetOutputSize(compressor.Instance, input.Length, finalBlock ? 1 : 0); - - //clamp the buffer size to the minimum output buffer size - bufferSize = Math.Max(bufferSize, minOutBufferSize); - - //rent a new buffer - compressor.OutputBuffer = ArrayPool.Shared.Rent(bufferSize); - } //Compress the block - int bytesWritten = CompressBlock(compressor.Instance, compressor.OutputBuffer, input, finalBlock); - - return compressor.OutputBuffer.AsMemory(0, bytesWritten); + return CompressBlock(compressor.Instance, output, input, false); } - private unsafe int CompressBlock(IntPtr comp, byte[] output, ReadOnlyMemory input, bool finalBlock) + private unsafe CompressionResult CompressBlock(IntPtr comp, ReadOnlyMemory output, ReadOnlyMemory input, bool finalBlock) { //get pointers to the input and output buffers using MemoryHandle inPtr = input.Pin(); - using MemoryHandle outPtr = MemoryUtil.PinArrayAndGetHandle(output, 0); + using MemoryHandle outPtr = output.Pin(); //Create the operation struct CompressionOperation operation; @@ -240,7 +206,11 @@ namespace VNLib.Net.Compression _nativeLib!.CompressBlock(comp, &operation); //Return the number of bytes written - return op->bytesWritten; + return new() + { + BytesRead = op->bytesRead, + BytesWritten = op->bytesWritten + }; } @@ -250,8 +220,6 @@ namespace VNLib.Net.Compression private sealed class Compressor { public IntPtr Instance; - - public byte[]? OutputBuffer; } } diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs index 2dea9d7..f77b3d2 100644 --- a/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs +++ b/lib/Net.Compression/VNLib.Net.CompressionTests/CompressorManagerTests.cs @@ -8,6 +8,7 @@ using System.Security.Cryptography; using VNLib.Utils.IO; using VNLib.Net.Http; +using VNLib.Utils.Extensions; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -16,11 +17,11 @@ namespace VNLib.Net.Compression.Tests [TestClass()] public class CompressorManagerTests { - const string LIB_PATH = @"F:\Programming\VNLib\VNLib.Net.Compression\native\vnlib_compress\build\Debug\vnlib_compress.dll"; + const string LIB_PATH = @"../../../../vnlib_compress/build/Debug/vnlib_compress.dll"; [TestMethod()] - public void OnLoadTest() + public void CompressDataStreamTest() { CompressorManager manager = InitCompressorUnderTest(); @@ -141,32 +142,37 @@ namespace VNLib.Net.Compression.Tests //Create a buffer to compress byte[] buffer = new byte[4096]; + byte[] output = new byte[4096]; //fill with random data RandomNumberGenerator.Fill(buffer); + int read = 0; + //try to compress the data in chunks - for(int i = 0; i < 4; i++) + while(read < buffer.Length) { //Get 4th of a buffer - ReadOnlyMemory chunk = buffer.AsMemory(i * 1024, 1024); + ReadOnlyMemory chunk = buffer.AsMemory(read, 1024); //Compress data - ReadOnlyMemory output = manager.CompressBlock(compressor, chunk, i == 3); + CompressionResult result = manager.CompressBlock(compressor, chunk, output); //Write the compressed data to the output stream - outputStream.Write(output.Span); + outputStream.Write(output.Slice(0, result.BytesWritten)); + + //Increment the read position + read += result.BytesRead; } - //flush the compressor - while(true) - { - ReadOnlyMemory output = manager.Flush(compressor); - if(output.IsEmpty) - { - break; - } - outputStream.Write(output.Span); + //Flush + int flushed = 100; + while(flushed > 0) + { + flushed = manager.Flush(compressor, output); + + //Write the compressed data to the output stream + outputStream.Write(output.AsSpan()[0..flushed]); } //Verify the data diff --git a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj index e235ac5..adf9496 100644 --- a/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj +++ b/lib/Net.Compression/VNLib.Net.CompressionTests/VNLib.Net.CompressionTests.csproj @@ -3,16 +3,18 @@ net6.0 enable - false true - - - - + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/lib/Net.Http/src/Core/Compression/CompressionResult.cs b/lib/Net.Http/src/Core/Compression/CompressionResult.cs new file mode 100644 index 0000000..aaeebc5 --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/CompressionResult.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: CompressionResult.cs +* +* CompressionResult.cs is part of VNLib.Net.Http which is part +* of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Net.Http +{ + /// + /// Represents the result of a block compression operation + /// + public readonly ref struct CompressionResult + { + /// + /// The number of bytes read from the input buffer + /// + public readonly int BytesRead { get; init; } + + /// + /// The number of bytes availabe in the output buffer + /// + public readonly int BytesWritten { get; init; } + } +} diff --git a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs index 80763f8..6aadd49 100644 --- a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs +++ b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs @@ -26,6 +26,7 @@ using System; namespace VNLib.Net.Http { + /// /// Represents an http compressor manager that creates compressor state instances and processes /// compression operations. @@ -54,29 +55,22 @@ namespace VNLib.Net.Http /// /// Compresses a block of data using the compressor state. The input block size is - /// guarunteed to the the block size returned by + /// guarunteed to be smaller than the block size returned by /// or smaller. - /// - /// This method may be called with an empty input block to flush the compressor state. - /// /// /// The compressor state instance /// The input buffer to compress - /// A value that indicates if this block is the final block - /// The compressed block to send to the client - /// - /// The returned memory block should belong to the individual compressor state, and be valid until the - /// call to deinit. The result of the block should remain valid - /// until the next call to compress or deinit. - /// - ReadOnlyMemory CompressBlock(object compressorState, ReadOnlyMemory input, bool finalBlock); + /// The output buffer to write the compressed data to + /// The result of the stream operation + CompressionResult CompressBlock(object compressorState, ReadOnlyMemory input, Memory output); /// /// Flushes any stored compressor data that still needs to be sent to the client. /// /// The compressor state instance - /// The remaining data stored in the compressor state, may be empty if no data is pending - ReadOnlyMemory Flush(object compressorState); + /// The output buffer + /// The number of bytes flushed to the output buffer + int Flush(object compressorState, Memory output); /// /// Initializes the compressor state for a compression operation diff --git a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs index a68a838..034b282 100644 --- a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs +++ b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs @@ -23,8 +23,6 @@ */ using System; -using System.IO; -using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Compression { @@ -49,21 +47,21 @@ namespace VNLib.Net.Http.Core.Compression /// Initializes the compressor for a compression operation /// /// The compression mode to use - /// The stream to write compressed data to - void Init(Stream output, CompressionMethod compMethod); + void Init(CompressionMethod compMethod); /// - /// Gets a value that indicates if the compressor requires more flushing to occur + /// Compresses a block of input data and writes the result to the output buffer /// - bool IsFlushRequired(); + /// The input data to compress + /// The output buffer to write compressed data to + /// The result of the compression operation + CompressionResult CompressBlock(ReadOnlyMemory input, Memory output); /// - /// Compresses a block of data and writes it to the output stream. If an empty flush is - /// commited, then the input buffer will be empty. + /// Writes any remaining data to the output buffer, flushing the compressor /// - /// The block of memory to write to compress - /// A value that indicates if this block is the final block - /// A task that represents the compression operation - ValueTask CompressBlockAsync(ReadOnlyMemory buffer, bool finalBlock); + /// The buffer to write output data to + /// The number of bytes written to the output buffer + int Flush(Memory output); } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs index b3f971a..34e59ac 100644 --- a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs +++ b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs @@ -23,9 +23,6 @@ */ using System; -using System.IO; -using System.Threading.Tasks; - namespace VNLib.Net.Http.Core.Compression { @@ -47,46 +44,39 @@ namespace VNLib.Net.Http.Core.Compression */ private object? _compressor; - private Stream? _stream; - private ReadOnlyMemory _lastFlush; private bool initialized; /// public int BlockSize { get; private set; } - public bool IsFlushRequired() + /// + public CompressionResult CompressBlock(ReadOnlyMemory input, Memory output) { - //See if a flush is required - _lastFlush = _provider.Flush(_compressor!); - return _lastFlush.Length > 0; + //Compress the block + return _provider.CompressBlock(_compressor!, input, output); } /// - public ValueTask CompressBlockAsync(ReadOnlyMemory buffer, bool finalBlock) + public int Flush(Memory output) { - /* - * If input buffer is empty and flush data is available, - * write the last flush data to the stream - */ - if(buffer.Length == 0 && _lastFlush.Length > 0) - { - return _stream!.WriteAsync(_lastFlush); - } + return _provider.Flush(_compressor!, output); + } - //Compress the block - ReadOnlyMemory result = _provider.CompressBlock(_compressor!, buffer, finalBlock); + /// + public void Init(CompressionMethod compMethod) + { + //Defer alloc the compressor + _compressor ??= _provider.AllocCompressor(); + + //Init the compressor and get the block size + BlockSize = _provider.InitCompressor(_compressor, compMethod); - //Write the compressed block to the stream - return _stream!.WriteAsync(result); + initialized = true; } /// public void Free() { - //Remove stream ref and de-init the compressor - _stream = null; - _lastFlush = default; - //Deinit compressor if initialized if (initialized) { @@ -94,18 +84,5 @@ namespace VNLib.Net.Http.Core.Compression initialized = false; } } - - /// - public void Init(Stream output, CompressionMethod compMethod) - { - //Defer alloc the compressor - _compressor ??= _provider.AllocCompressor(); - - //Init the compressor and get the block size - BlockSize = _provider.InitCompressor(_compressor, compMethod); - - _stream = output; - initialized = true; - } } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs index f0f88d2..696e9da 100644 --- a/lib/Net.Http/src/Core/IHttpResponseBody.cs +++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs @@ -23,9 +23,9 @@ */ using System; -using System.IO; using System.Threading.Tasks; +using VNLib.Net.Http.Core.Response; using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core @@ -63,28 +63,15 @@ namespace VNLib.Net.Http.Core /// An optional buffer used to buffer responses /// The maximum length of the response data to write /// A task that resolves when the response is completed - Task WriteEntityAsync(Stream dest, long count, Memory buffer); + Task WriteEntityAsync(IDirectResponsWriter dest, long count, Memory buffer); /// /// Writes internal response entity data to the destination stream /// - /// The response compressor + /// The response compressor + /// The response output writer /// An optional buffer used to buffer responses /// A task that resolves when the response is completed - Task WriteEntityAsync(IResponseCompressor dest, Memory buffer); - - /* - * Added to the response writing hot-paths optimize calls when compression - * is disabled and an explicit length is not required. - */ - - /// - /// Writes internal response entity data to the destination stream - /// without compression - /// - /// The response stream to write data to - /// Optional buffer if required, used to buffer response data - /// A task that resolves when the response is completed - Task WriteEntityAsync(Stream dest, Memory buffer); + Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory buffer); } } \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Request/HttpInputStream.cs b/lib/Net.Http/src/Core/Request/HttpInputStream.cs index dc903d2..fbb1d41 100644 --- a/lib/Net.Http/src/Core/Request/HttpInputStream.cs +++ b/lib/Net.Http/src/Core/Request/HttpInputStream.cs @@ -26,6 +26,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Runtime.CompilerServices; using VNLib.Utils; using VNLib.Utils.Memory; @@ -49,6 +50,8 @@ namespace VNLib.Net.Http.Core public HttpInputStream(IHttpContextInformation contextInfo) => ContextInfo = contextInfo; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void OnComplete() { //Dispose the initial data buffer if set diff --git a/lib/Net.Http/src/Core/Request/HttpRequest.cs b/lib/Net.Http/src/Core/Request/HttpRequest.cs index 0668197..2cc99bd 100644 --- a/lib/Net.Http/src/Core/Request/HttpRequest.cs +++ b/lib/Net.Http/src/Core/Request/HttpRequest.cs @@ -26,6 +26,7 @@ using System; using System.Net; using System.Collections.Generic; using System.Security.Authentication; +using System.Runtime.CompilerServices; using VNLib.Utils; using VNLib.Utils.Memory; @@ -97,6 +98,7 @@ namespace VNLib.Net.Http.Core void IHttpLifeCycle.OnNewConnection() { } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnNewRequest() { //Set to defaults @@ -106,6 +108,7 @@ namespace VNLib.Net.Http.Core HttpVersion = HttpVersion.None; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //release the input stream diff --git a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs index 4a88361..f87ec98 100644 --- a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs +++ b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs @@ -24,7 +24,6 @@ using System; -using VNLib.Utils; using VNLib.Utils.IO; using VNLib.Net.Http.Core.Buffering; @@ -57,66 +56,45 @@ namespace VNLib.Net.Http.Core.Response */ private int _reservedOffset; - /// public int RemainingSize => Buffer.Size - AccumulatedSize; /// - public Span Remaining => Buffer.GetBinSpan()[AccumulatedSize..]; + Span IDataAccumulator.Remaining => Buffer.GetBinSpan()[AccumulatedSize..]; /// - public Span Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize]; + Span IDataAccumulator.Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize]; /// public int AccumulatedSize { get; set; } - /* - * Completed chunk is the segment of the buffer that contains the size segment - * followed by the accumulated chunk data, and the trailing crlf. - * - * AccumulatedSize points to the end of the accumulated chunk data. The reserved - * offset points to the start of the size segment. - */ - private Memory GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize]; + /// + public void Advance(int count) => AccumulatedSize += count; /// - /// Attempts to buffer as much data as possible from the specified data + /// Gets the remaining segment of the buffer to write chunk data to. /// - /// The data to copy - /// The number of bytes that were buffered - public ERRNO TryBufferChunk(ReadOnlySpan data) + /// The chunk buffer to write data to + public Memory GetRemainingSegment() { - //Calc data size and reserve space for final crlf - int dataToCopy = Math.Min(data.Length, RemainingSize - Context.EncodedSegments.CrlfBytes.Length); - - //Write as much data as possible - data[..dataToCopy].CopyTo(Remaining); - - //Advance buffer - Advance(dataToCopy); + /* + * We need to return the remaining segment of the buffer, the segment after the + * accumulated chunk data, but before the trailing crlf. + */ - //Return number of bytes not written - return dataToCopy; + //Get the remaining buffer segment + return Buffer.GetMemory().Slice(AccumulatedSize, RemainingSize - Context.EncodedSegments.CrlfBytes.Length); } - /// - public void Advance(int count) => AccumulatedSize += count; - - private void InitReserved() + /// + /// Calculates the usable remaining size of the chunk buffer. + /// + /// The number of bytes remaining in the buffer + public int GetRemainingSegmentSize() { - //First reserve the chunk window by advancing the accumulator to the reserved size - Advance(ReservedSize); - } - - /// - public void Reset() - { - //zero offsets - _reservedOffset = 0; - AccumulatedSize = 0; - //Init reserved segment - InitReserved(); + //Remaining size accounting for the trailing crlf + return RemainingSize - Context.EncodedSegments.CrlfBytes.Length; } /// @@ -155,6 +133,32 @@ namespace VNLib.Net.Http.Core.Response } + /* + * Completed chunk is the segment of the buffer that contains the size segment + * followed by the accumulated chunk data, and the trailing crlf. + * + * AccumulatedSize points to the end of the accumulated chunk data. The reserved + * offset points to the start of the size segment. + */ + private Memory GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize]; + + + private void InitReserved() + { + //First reserve the chunk window by advancing the accumulator to the reserved size + Advance(ReservedSize); + } + + /// + public void Reset() + { + //zero offsets + _reservedOffset = 0; + AccumulatedSize = 0; + //Init reserved segment + InitReserved(); + } + /* * UpdateChunkSize method updates the running total of the chunk size * in the reserved segment of the buffer. This is because http chunking diff --git a/lib/Net.Http/src/Core/Response/ChunkedStream.cs b/lib/Net.Http/src/Core/Response/ChunkedStream.cs index 639e18f..267d8ed 100644 --- a/lib/Net.Http/src/Core/Response/ChunkedStream.cs +++ b/lib/Net.Http/src/Core/Response/ChunkedStream.cs @@ -26,192 +26,70 @@ * Provides a Chunked data-encoding stream for writing data-chunks to * the transport using the basic chunked encoding format from MDN * https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#directives -* -* This stream will buffer entire chunks to avoid multiple writes to the -* transport which can block or at minium cause overhead in context switching -* which should be mostly avoided but cause overhead in copying. Time profiling -* showed nearly equivalent performance for small chunks for synchronous writes. -* */ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils; -using VNLib.Utils.Memory; using VNLib.Net.Http.Core.Buffering; namespace VNLib.Net.Http.Core.Response { -#pragma warning disable CA2215 // Dispose methods should call base class dispose -#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task - /// /// Writes chunked HTTP message bodies to an underlying streamwriter /// - internal sealed class ChunkedStream : ReusableResponseStream + internal sealed class ChunkedStream : ReusableResponseStream, IResponseDataWriter { private readonly ChunkDataAccumulator ChunckAccumulator; - - private bool HadError; internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context) { //Init accumulator ChunckAccumulator = new(buffer, context); } - - public override void Write(ReadOnlySpan chunk) - { - //Only write non-zero chunks - if (chunk.Length <= 0) - { - return; - } - - //Init reader - ForwardOnlyReader reader = new(chunk); - try - { - do - { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window); - - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); - //Flush accumulator - Memory accChunk = ChunckAccumulator.GetChunkData(); - - //Reset the chunk accumulator - ChunckAccumulator.Reset(); - - //Write chunk data - transport!.Write(accChunk.Span); - - //Continue to buffer / flush as needed - continue; - } - break; - } - while (true); - } - catch - { - HadError = true; - throw; - } - } + #region Hooks - public override async ValueTask WriteAsync(ReadOnlyMemory chunk, CancellationToken cancellationToken = default) + /// + public void OnNewRequest() { - //Only write non-zero chunks - if (chunk.Length <= 0) - { - return; - } - - try - { - //Init reader - ForwardOnlyMemoryReader reader = new(chunk); - - do - { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span); - - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); - - //Flush accumulator - Memory accChunk = ChunckAccumulator.GetChunkData(); - - //Reset the chunk accumulator - ChunckAccumulator.Reset(); - - //Flush accumulator async - await transport!.WriteAsync(accChunk, cancellationToken); - - //Continue to buffer / flush as needed - continue; - } - - break; - } - while (true); - } - catch - { - HadError = true; - throw; - } + ChunckAccumulator.OnNewRequest(); } - public override async ValueTask DisposeAsync() + /// + public void OnComplete() { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } - - //Complete the last chunk - Memory chunkData = ChunckAccumulator.GetFinalChunkData(); - - //Reset the accumulator - ChunckAccumulator.Reset(); + ChunckAccumulator.OnComplete(); + } - //Write remaining data to stream - await transport!.WriteAsync(chunkData, CancellationToken.None); + /// + public Memory GetMemory() => ChunckAccumulator.GetRemainingSegment(); - //Flush base stream - await transport!.FlushAsync(CancellationToken.None); + /// + public int Advance(int written) + { + //Advance the accumulator + ChunckAccumulator.Advance(written); + return ChunckAccumulator.GetRemainingSegmentSize(); } - public override void Close() + /// + public ValueTask FlushAsync(bool isFinal) { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } - - //Complete the last chunk - Memory chunkData = ChunckAccumulator.GetFinalChunkData(); + /* + * We need to know when the final chunk is being flushed so we can + * write the final termination sequence to the transport. + */ + + Memory chunkData = isFinal ? ChunckAccumulator.GetFinalChunkData() : ChunckAccumulator.GetChunkData(); //Reset the accumulator ChunckAccumulator.Reset(); - //Write chunk data - transport!.Write(chunkData.Span); - - //Flush base stream - transport!.Flush(); - } - - #region Hooks - - public void OnNewRequest() - { - ChunckAccumulator.OnNewRequest(); - } - - public void OnComplete() - { - ChunckAccumulator.OnComplete(); - - //Clear error flag - HadError = false; + //Write remaining data to stream + return transport!.WriteAsync(chunkData, CancellationToken.None); } #endregion diff --git a/lib/Net.Http/src/Core/Response/DirectStream.cs b/lib/Net.Http/src/Core/Response/DirectStream.cs index 7d0e568..2406f0f 100644 --- a/lib/Net.Http/src/Core/Response/DirectStream.cs +++ b/lib/Net.Http/src/Core/Response/DirectStream.cs @@ -23,28 +23,17 @@ */ using System; -using System.Threading; using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Response { - internal sealed class DirectStream : ReusableResponseStream - { - - public override void Write(ReadOnlySpan buffer) - { - transport!.Write(buffer); - } - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) - { - return transport!.WriteAsync(buffer, cancellationToken); - } - - //Pass through flush methods - public override void Flush() => transport!.Flush(); - - public override Task FlushAsync(CancellationToken cancellationToken) => transport!.FlushAsync(cancellationToken); + internal sealed class DirectStream : ReusableResponseStream, IDirectResponsWriter + { + /// + public Task FlushAsync() => transport!.FlushAsync(); + /// + public ValueTask WriteAsync(ReadOnlyMemory buffer) => transport!.WriteAsync(buffer); } } diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs index 0b29e43..ca5f040 100644 --- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs +++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs @@ -23,11 +23,12 @@ */ using System; -using System.IO; using System.Net; using System.Diagnostics; using System.Threading.Tasks; +using VNLib.Net.Http.Core.Response; + namespace VNLib.Net.Http.Core { #pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task @@ -161,49 +162,36 @@ namespace VNLib.Net.Http.Core private async Task WriteEntityDataAsync(long length, CompressionMethod compMethod, bool hasExplicitLength) { - //Get output stream, and always dispose it - await using Stream outputStream = await GetOutputStreamAsync(length, compMethod); - //Determine if buffer is required Memory buffer = ResponseBody.BufferRequired ? Buffers.GetResponseDataBuffer() : Memory.Empty; - /* - * Using compression, we must initialize a compressor, and write the response - * with the locked compressor - */ - if (compMethod != CompressionMethod.None) + //We need to flush header before we can write to the transport + await Response.CompleteHeadersAsync(compMethod == CompressionMethod.None ? length : -1); + + if (compMethod == CompressionMethod.None) + { + //Setup a direct stream to write to + IDirectResponsWriter output = Response.GetDirectStream(); + + //Write response with optional forced length + await ResponseBody.WriteEntityAsync(output, hasExplicitLength ? length : -1, buffer); + } + else { //Compressor must never be null at this point Debug.Assert(_compressor != null, "Compression was allowed but the compressor was not initialized"); + //Get the chunked response writer + IResponseDataWriter output = Response.GetChunkWriter(); + //Init compressor (Deinint is deferred to the end of the request) - _compressor.Init(outputStream, compMethod); + _compressor.Init(compMethod); //Write response - await ResponseBody.WriteEntityAsync(_compressor, buffer); - - } - /* - * Explicit length may be set when the response may have more data than requested - * by the client. IE: when a range is set, we need to make sure we sent exactly the - * correct data, otherwise the client will drop the connection. - */ - else if(hasExplicitLength) - { - //Write response with explicit length - await ResponseBody.WriteEntityAsync(outputStream, length, buffer); - - } - else - { - await ResponseBody.WriteEntityAsync(outputStream, buffer); + await ResponseBody.WriteEntityAsync(_compressor, output, buffer); } } - private ValueTask GetOutputStreamAsync(long length, CompressionMethod compMethod) - { - return compMethod == CompressionMethod.None ? Response.GetStreamAsync(length) : Response.GetStreamAsync(); - } #pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task diff --git a/lib/Net.Http/src/Core/Response/HttpResponse.cs b/lib/Net.Http/src/Core/Response/HttpResponse.cs index c09c3f7..90f6f24 100644 --- a/lib/Net.Http/src/Core/Response/HttpResponse.cs +++ b/lib/Net.Http/src/Core/Response/HttpResponse.cs @@ -190,59 +190,57 @@ namespace VNLib.Net.Http.Core } /// - /// Gets a stream for writing data of a specified length directly to the client + /// Flushes all available headers to the transport asynchronously /// - /// - /// A configured for writing data to client - /// - /// - public ValueTask GetStreamAsync(long ContentLength) + /// The optional content length if set, for chunked responses + /// A value task that completes when header data has been made available to the transport + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ValueTask CompleteHeadersAsync(long contentLength) { Check(); - //Add content length header - Headers[HttpResponseHeader.ContentLength] = ContentLength.ToString(); + if (contentLength < 0) + { + //Add chunked header + Headers[HttpResponseHeader.TransferEncoding] = "chunked"; + + } + else + { + //Add content length header + Headers[HttpResponseHeader.ContentLength] = contentLength.ToString(); + } //Flush headers - ValueTask flush = EndFlushHeadersAsync(); + return EndFlushHeadersAsync(); + } - //Return the reusable stream - return flush.IsCompletedSuccessfully ? - ValueTask.FromResult(ReusableDirectStream) - : GetStreamAsyncCore(flush, ReusableDirectStream); + /// + /// Gets a response writer for writing directly to the transport stream + /// + /// The instance for writing stream data to + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IDirectResponsWriter GetDirectStream() + { + //Headers must be sent before getting a direct stream + Debug.Assert(HeadersSent); + return ReusableDirectStream; } /// - /// Sets up the client for chuncked encoding and gets a stream that allows for chuncks to be sent. User must call dispose on stream when done writing data + /// Gets a response writer for writing chunked data to the transport stream /// - /// supporting chunked encoding - /// - /// - public ValueTask GetStreamAsync() + /// The for buffering response chunks + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IResponseDataWriter GetChunkWriter() { //Chunking is only an http 1.1 feature (should never get called otherwise) - Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11); - - Check(); - - //Set encoding type to chunked with user-defined compression - Headers[HttpResponseHeader.TransferEncoding] = "chunked"; - - //Flush headers - ValueTask flush = EndFlushHeadersAsync(); + Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11, "Chunked response handler was requested, but is not an HTTP/1.1 response"); + Debug.Assert(HeadersSent, "Chunk write was requested but header data has not been sent"); - //Return the reusable stream - return flush.IsCompletedSuccessfully ? - ValueTask.FromResult(ReusableChunkedStream) - : GetStreamAsyncCore(flush, ReusableChunkedStream); - } - - private static async ValueTask GetStreamAsyncCore(ValueTask flush, Stream stream) - { - //Await the flush and get the stream - await flush.ConfigureAwait(false); - return stream; + return ReusableChunkedStream; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] void Check() @@ -329,6 +327,7 @@ namespace VNLib.Net.Http.Core } /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnNewRequest() { //Default to okay status code @@ -338,6 +337,7 @@ namespace VNLib.Net.Http.Core } /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //Clear headers and cookies diff --git a/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs new file mode 100644 index 0000000..7c9ca41 --- /dev/null +++ b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IDirectResponsWriter.cs +* +* IDirectResponsWriter.cs is part of VNLib.Net.Http which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Response +{ + /// + /// Represents a stream that can be written to directly (does not + /// buffer response data) + /// + internal interface IDirectResponsWriter + { + /// + /// Writes the given data buffer to the client + /// + /// The response data to write + /// A value task that resolves when the write operation is complete + ValueTask WriteAsync(ReadOnlyMemory buffer); + + /// + /// Flushes any remaining data to the client + /// + /// A task that resolves when the flush operationis complete + Task FlushAsync(); + } +} diff --git a/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs new file mode 100644 index 0000000..55ef49c --- /dev/null +++ b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs @@ -0,0 +1,56 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IResponseDataWriter.cs +* +* IResponseDataWriter.cs is part of VNLib.Net.Http which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Response +{ + /// + /// A buffered http response data writer + /// + internal interface IResponseDataWriter + { + /// + /// Gets the next memory segment available to buffer data to + /// + /// An available buffer to write response data to + Memory GetMemory(); + + /// + /// Advances the writer by the number of bytes written and returns the + /// number of bytes available for writing on the next call to + /// + /// The number of bytes written to the output buffer + /// The number of bytes remaining in the internal buffer + int Advance(int written); + + /// + /// Flushes the internal buffer to the underlying stream + /// + /// A value that indicates that this is final call to flush + /// A valuetask that completes + ValueTask FlushAsync(bool isFinal); + } +} \ No newline at end of file diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs index 77dc619..d67fc01 100644 --- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs +++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs @@ -22,12 +22,20 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +/* + * This file handles response entity processing. It handles in-memory response + * processing, as well as stream response processing. It handles contraints + * such as content-range limits. I tried to eliminate or reduce the amount of + * memory copying required to process the response entity. + */ + using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Runtime.CompilerServices; -using VNLib.Utils.Extensions; +using VNLib.Net.Http.Core.Response; using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core @@ -94,68 +102,95 @@ namespace VNLib.Net.Http.Core ReadOnlyMemory _readSegment; /// - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory buffer) + async Task IHttpResponseBody.WriteEntityAsync(IDirectResponsWriter dest, long count, Memory buffer) { int remaining; //Write a sliding window response if (_memoryResponse != null) { - //Get min value from count/range length - remaining = (int)Math.Min(count, _memoryResponse.Remaining); - - //Write response body from memory - while (remaining > 0) + if(count > 0) { - //Get remaining segment - _readSegment = _memoryResponse.GetRemainingConstrained(remaining); - - //Write segment to output stream - await dest.WriteAsync(_readSegment); - - //Advance by the written ammount - _memoryResponse.Advance(_readSegment.Length); - - //Update remaining - remaining -= _readSegment.Length; - } - } - else - { - //Buffer is required, and count must be supplied - await _streamResponse!.CopyToAsync(dest, buffer, count); + //Get min value from count/range length + remaining = (int)Math.Min(count, _memoryResponse.Remaining); - //Try to dispose the response stream asyncrhonously since we are done with it - await _streamResponse!.DisposeAsync(); + //Write response body from memory + while (remaining > 0) + { + //Get remaining segment + _readSegment = _memoryResponse.GetRemainingConstrained(remaining); - //remove ref so its not disposed again - _streamResponse = null; - } - } + //Write segment to output stream + await dest.WriteAsync(_readSegment); - /// - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory buffer) - { - //Write a sliding window response - if (_memoryResponse != null) - { - //Write response body from memory - while (_memoryResponse.Remaining > 0) + //Advance by the written ammount + _memoryResponse.Advance(_readSegment.Length); + + //Update remaining + remaining -= _readSegment.Length; + } + } + else { - //Get remaining segment - _readSegment = _memoryResponse.GetMemory(); + //Write response body from memory + while (_memoryResponse.Remaining > 0) + { + //Get remaining segment + _readSegment = _memoryResponse.GetMemory(); - //Write segment to output stream - await dest.WriteAsync(_readSegment); + //Write segment to output stream + await dest.WriteAsync(_readSegment); - //Advance by the written ammount - _memoryResponse.Advance(_readSegment.Length); + //Advance by the written amount + _memoryResponse.Advance(_readSegment.Length); + } } + + //Disposing of memory response can be deferred until the end of the request since its always syncrhonous } else { - //Buffer is required, and count must be supplied - await _streamResponse!.CopyToAsync(dest, buffer); + if (count > 0) + { + //Buffer is required, and count must be supplied + + long total = 0; + int read; + while (true) + { + //get offset wrapper of the total buffer or remaining count + Memory offset = buffer[..(int)Math.Min(buffer.Length, count - total)]; + //read + read = await _streamResponse!.ReadAsync(offset); + //Guard + if (read == 0) + { + break; + } + //write only the data that was read (slice) + await dest.WriteAsync(offset[..read]); + //Update total + total += read; + } + } + else + { + //Read in loop + do + { + //read + int read = await _streamResponse!.ReadAsync(buffer); + //Guard + if (read == 0) + { + break; + } + + //write only the data that was read (slice) + await dest.WriteAsync(buffer[..read]); + + } while (true); + } //Try to dispose the response stream asyncrhonously since we are done with it await _streamResponse!.DisposeAsync(); @@ -166,64 +201,22 @@ namespace VNLib.Net.Http.Core } /// - async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor dest, Memory buffer) + async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory buffer) { //Locals - bool remaining; int read; //Write a sliding window response if (_memoryResponse != null) - { - /* - * It is safe to assume if a response body was set, that it contains data. - * So the cost or running a loop without data is not a concern. - * - * Since any failed writes to the output will raise exceptions, it is safe - * to advance the reader before writing the data, so we can determine if the - * block is final. - * - * Since we are using a byte-stream reader for memory responses, we can optimize the - * compression loop, if we know its operating block size, so we only compress blocks - * of the block size, then continue the loop without branching or causing nested - * loops - */ - - //Optimize for block size - if (dest.BlockSize > 0) - { - //Write response body from memory - do - { - _readSegment = _memoryResponse.GetRemainingConstrained(dest.BlockSize); - - //Advance by the trimmed segment length - _memoryResponse.Advance(_readSegment.Length); - - //Check if data is remaining after an advance - remaining = _memoryResponse.Remaining > 0; - - //Compress the trimmed block - await dest.CompressBlockAsync(_readSegment, !remaining); - - } while (remaining); - } - else + { + while (_memoryResponse.Remaining > 0) { - do + //Commit output bytes + if (CompressNextSegment(_memoryResponse, comp, writer)) { - _readSegment = _memoryResponse.GetMemory(); - - //Advance by the segment length, this should be safe even if its zero - _memoryResponse.Advance(_readSegment.Length); - - //Check if data is remaining after an advance - remaining = _memoryResponse.Remaining > 0; - - //Write to output - await dest.CompressBlockAsync(_readSegment, !remaining); - - } while (remaining); + //Time to flush + await writer.FlushAsync(false); + } } //Disposing of memory response can be deferred until the end of the request since its always syncrhonous @@ -231,9 +224,9 @@ namespace VNLib.Net.Http.Core else { //Trim buffer to block size if it is set by the compressor - if (dest.BlockSize > 0) + if (comp.BlockSize > 0) { - buffer = buffer[..dest.BlockSize]; + buffer = buffer[..comp.BlockSize]; } //Read in loop @@ -248,8 +241,12 @@ namespace VNLib.Net.Http.Core break; } - //write only the data that was read, as a segment instead of a block - await dest.CompressBlockAsync(buffer[..read], read < buffer.Length); + //Compress the buffered data and flush if required + if(CompressNextSegment(buffer, comp, writer)) + { + //Time to flush + await writer.FlushAsync(false); + } } while (true); @@ -263,17 +260,69 @@ namespace VNLib.Net.Http.Core _streamResponse = null; } - //Continue flusing flushing the compressor if required - while(dest.IsFlushRequired()) + + /* + * Once there is no more response data avialable to compress + * we need to flush the compressor, then flush the writer + * to publish all accumulated data to the client + */ + + do { - //Flush the compressor - await dest.CompressBlockAsync(ReadOnlyMemory.Empty, true); - } + //Get output buffer + Memory output = writer.GetMemory(); + + //Flush the compressor output + int written = comp.Flush(output); + + //No more data to buffer + if (written == 0) + { + //final flush and exit + await writer.FlushAsync(true); + break; + } + + if (writer.Advance(written) == 0) + { + //Flush because accumulator is full + await writer.FlushAsync(false); + } + + } while (true); + } + + private static bool CompressNextSegment(IMemoryResponseReader reader, IResponseCompressor comp, IResponseDataWriter writer) + { + //Read the next segment + ReadOnlyMemory readSegment = comp.BlockSize > 0 ? reader.GetRemainingConstrained(comp.BlockSize) : reader.GetMemory(); + + //Get output buffer + Memory output = writer.GetMemory(); + + //Compress the trimmed block + CompressionResult res = comp.CompressBlock(readSegment, output); + + //Commit input bytes + reader.Advance(res.BytesRead); + + return writer.Advance(res.BytesWritten) == 0; + } + + private static bool CompressNextSegment(Memory readSegment, IResponseCompressor comp, IResponseDataWriter writer) + { + //Get output buffer + Memory output = writer.GetMemory(); + + //Compress the trimmed block + CompressionResult res = comp.CompressBlock(readSegment, output); + + return writer.Advance(res.BytesWritten) == 0; } #pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task - + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //Clear has data flag diff --git a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs index 0be4821..60465f9 100644 --- a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs +++ b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs @@ -22,18 +22,12 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; using System.IO; -using System.Threading; -using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Response { -#pragma warning disable CA2215 // Dispose methods should call base class dispose -#pragma warning disable CA1844 // Provide memory-based overrides of async methods when subclassing 'Stream' - - internal abstract class ReusableResponseStream : Stream + internal abstract class ReusableResponseStream { protected Stream? transport; @@ -46,50 +40,7 @@ namespace VNLib.Net.Http.Core.Response /// /// Called when the connection is released /// - public virtual void OnRelease() => this.transport = null; - - - //Block base dispose - protected override void Dispose(bool disposing) - { } - - //Block base close - public override void Close() - { } - - //block base dispose async - public override ValueTask DisposeAsync() - { - return ValueTask.CompletedTask; - } - - //Block flush - public override void Flush() - { } - - //Block flush async - public override Task FlushAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - //Block stream basics - public override bool CanRead => false; - public override bool CanSeek => false; - public override bool CanWrite => true; - public override long Length => throw new NotSupportedException(); - public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } - - //Reading is not enabled - public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from"); - public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking"); - public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking"); - - public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count)); - - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); - } + public virtual void OnRelease() => transport = null; + } } \ No newline at end of file diff --git a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj index 616f907..d53cc7d 100644 --- a/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj +++ b/lib/Plugins.PluginBase/src/VNLib.Plugins.PluginBase.csproj @@ -41,7 +41,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/lib/Utils/src/Extensions/IoExtensions.cs b/lib/Utils/src/Extensions/IoExtensions.cs index 637cfab..bcbe810 100644 --- a/lib/Utils/src/Extensions/IoExtensions.cs +++ b/lib/Utils/src/Extensions/IoExtensions.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Utils @@ -328,12 +328,11 @@ namespace VNLib.Utils.Extensions * bytes from the source */ long total = 0; - int bufferSize = buffer.Length; int read; while (true) { //get offset wrapper of the total buffer or remaining count - Memory offset = buffer[..(int)Math.Min(bufferSize, count - total)]; + Memory offset = buffer[..(int)Math.Min(buffer.Length, count - total)]; //read read = await source.ReadAsync(offset, token); //Guard diff --git a/lib/Utils/tests/VNLib.UtilsTests.csproj b/lib/Utils/tests/VNLib.UtilsTests.csproj index 5651bb3..c29915d 100644 --- a/lib/Utils/tests/VNLib.UtilsTests.csproj +++ b/lib/Utils/tests/VNLib.UtilsTests.csproj @@ -16,9 +16,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive -- cgit