diff options
Diffstat (limited to 'lib/Net.Http/src')
16 files changed, 486 insertions, 518 deletions
diff --git a/lib/Net.Http/src/Core/Compression/CompressionResult.cs b/lib/Net.Http/src/Core/Compression/CompressionResult.cs new file mode 100644 index 0000000..aaeebc5 --- /dev/null +++ b/lib/Net.Http/src/Core/Compression/CompressionResult.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: CompressionResult.cs +* +* CompressionResult.cs is part of VNLib.Net.Http which is part +* of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Net.Http +{ + /// <summary> + /// Represents the result of a block compression operation + /// </summary> + public readonly ref struct CompressionResult + { + /// <summary> + /// The number of bytes read from the input buffer + /// </summary> + public readonly int BytesRead { get; init; } + + /// <summary> + /// The number of bytes availabe in the output buffer + /// </summary> + public readonly int BytesWritten { get; init; } + } +} diff --git a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs index 80763f8..6aadd49 100644 --- a/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs +++ b/lib/Net.Http/src/Core/Compression/IHttpCompressorManager.cs @@ -26,6 +26,7 @@ using System; namespace VNLib.Net.Http { + /// <summary> /// Represents an http compressor manager that creates compressor state instances and processes /// compression operations. @@ -54,29 +55,22 @@ namespace VNLib.Net.Http /// <summary> /// Compresses a block of data using the compressor state. The input block size is - /// guarunteed to the the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/> + /// guarunteed to be smaller than the block size returned by <see cref="InitCompressor(object, CompressionMethod)"/> /// or smaller. - /// <para> - /// This method may be called with an empty input block to flush the compressor state. - /// </para> /// </summary> /// <param name="compressorState">The compressor state instance</param> /// <param name="input">The input buffer to compress</param> - /// <param name="finalBlock">A value that indicates if this block is the final block</param> - /// <returns>The compressed block to send to the client</returns> - /// <remarks> - /// The returned memory block should belong to the individual compressor state, and be valid until the - /// call to deinit. The result of the block should remain valid - /// until the next call to compress or deinit. - /// </remarks> - ReadOnlyMemory<byte> CompressBlock(object compressorState, ReadOnlyMemory<byte> input, bool finalBlock); + /// <param name="output">The output buffer to write the compressed data to</param> + /// <returns>The result of the stream operation</returns> + CompressionResult CompressBlock(object compressorState, ReadOnlyMemory<byte> input, Memory<byte> output); /// <summary> /// Flushes any stored compressor data that still needs to be sent to the client. /// </summary> /// <param name="compressorState">The compressor state instance</param> - /// <returns>The remaining data stored in the compressor state, may be empty if no data is pending</returns> - ReadOnlyMemory<byte> Flush(object compressorState); + /// <param name="output">The output buffer</param> + /// <returns>The number of bytes flushed to the output buffer</returns> + int Flush(object compressorState, Memory<byte> output); /// <summary> /// Initializes the compressor state for a compression operation diff --git a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs index a68a838..034b282 100644 --- a/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs +++ b/lib/Net.Http/src/Core/Compression/IResponseCompressor.cs @@ -23,8 +23,6 @@ */ using System; -using System.IO; -using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Compression { @@ -49,21 +47,21 @@ namespace VNLib.Net.Http.Core.Compression /// Initializes the compressor for a compression operation /// </summary> /// <param name="compMethod">The compression mode to use</param> - /// <param name="output">The stream to write compressed data to</param> - void Init(Stream output, CompressionMethod compMethod); + void Init(CompressionMethod compMethod); /// <summary> - /// Gets a value that indicates if the compressor requires more flushing to occur + /// Compresses a block of input data and writes the result to the output buffer /// </summary> - bool IsFlushRequired(); + /// <param name="input">The input data to compress</param> + /// <param name="output">The output buffer to write compressed data to</param> + /// <returns>The result of the compression operation</returns> + CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output); /// <summary> - /// Compresses a block of data and writes it to the output stream. If an empty flush is - /// commited, then the input buffer will be empty. + /// Writes any remaining data to the output buffer, flushing the compressor /// </summary> - /// <param name="buffer">The block of memory to write to compress</param> - /// <param name="finalBlock">A value that indicates if this block is the final block</param> - /// <returns>A task that represents the compression operation</returns> - ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock); + /// <param name="output">The buffer to write output data to</param> + /// <returns>The number of bytes written to the output buffer</returns> + int Flush(Memory<byte> output); } }
\ No newline at end of file diff --git a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs index b3f971a..34e59ac 100644 --- a/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs +++ b/lib/Net.Http/src/Core/Compression/ManagedHttpCompressor.cs @@ -23,9 +23,6 @@ */ using System; -using System.IO; -using System.Threading.Tasks; - namespace VNLib.Net.Http.Core.Compression { @@ -47,46 +44,39 @@ namespace VNLib.Net.Http.Core.Compression */ private object? _compressor; - private Stream? _stream; - private ReadOnlyMemory<byte> _lastFlush; private bool initialized; ///<inheritdoc/> public int BlockSize { get; private set; } - public bool IsFlushRequired() + ///<inheritdoc/> + public CompressionResult CompressBlock(ReadOnlyMemory<byte> input, Memory<byte> output) { - //See if a flush is required - _lastFlush = _provider.Flush(_compressor!); - return _lastFlush.Length > 0; + //Compress the block + return _provider.CompressBlock(_compressor!, input, output); } ///<inheritdoc/> - public ValueTask CompressBlockAsync(ReadOnlyMemory<byte> buffer, bool finalBlock) + public int Flush(Memory<byte> output) { - /* - * If input buffer is empty and flush data is available, - * write the last flush data to the stream - */ - if(buffer.Length == 0 && _lastFlush.Length > 0) - { - return _stream!.WriteAsync(_lastFlush); - } + return _provider.Flush(_compressor!, output); + } - //Compress the block - ReadOnlyMemory<byte> result = _provider.CompressBlock(_compressor!, buffer, finalBlock); + ///<inheritdoc/> + public void Init(CompressionMethod compMethod) + { + //Defer alloc the compressor + _compressor ??= _provider.AllocCompressor(); + + //Init the compressor and get the block size + BlockSize = _provider.InitCompressor(_compressor, compMethod); - //Write the compressed block to the stream - return _stream!.WriteAsync(result); + initialized = true; } ///<inheritdoc/> public void Free() { - //Remove stream ref and de-init the compressor - _stream = null; - _lastFlush = default; - //Deinit compressor if initialized if (initialized) { @@ -94,18 +84,5 @@ namespace VNLib.Net.Http.Core.Compression initialized = false; } } - - ///<inheritdoc/> - public void Init(Stream output, CompressionMethod compMethod) - { - //Defer alloc the compressor - _compressor ??= _provider.AllocCompressor(); - - //Init the compressor and get the block size - BlockSize = _provider.InitCompressor(_compressor, compMethod); - - _stream = output; - initialized = true; - } } }
\ No newline at end of file diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs index f0f88d2..696e9da 100644 --- a/lib/Net.Http/src/Core/IHttpResponseBody.cs +++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs @@ -23,9 +23,9 @@ */ using System; -using System.IO; using System.Threading.Tasks; +using VNLib.Net.Http.Core.Response; using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core @@ -63,28 +63,15 @@ namespace VNLib.Net.Http.Core /// <param name="buffer">An optional buffer used to buffer responses</param> /// <param name="count">The maximum length of the response data to write</param> /// <returns>A task that resolves when the response is completed</returns> - Task WriteEntityAsync(Stream dest, long count, Memory<byte> buffer); + Task WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer); /// <summary> /// Writes internal response entity data to the destination stream /// </summary> - /// <param name="dest">The response compressor</param> + /// <param name="comp">The response compressor</param> + /// <param name="writer">The response output writer</param> /// <param name="buffer">An optional buffer used to buffer responses</param> /// <returns>A task that resolves when the response is completed</returns> - Task WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer); - - /* - * Added to the response writing hot-paths optimize calls when compression - * is disabled and an explicit length is not required. - */ - - /// <summary> - /// Writes internal response entity data to the destination stream - /// without compression - /// </summary> - /// <param name="dest">The response stream to write data to</param> - /// <param name="buffer">Optional buffer if required, used to buffer response data</param> - /// <returns>A task that resolves when the response is completed</returns> - Task WriteEntityAsync(Stream dest, Memory<byte> buffer); + Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer); } }
\ No newline at end of file diff --git a/lib/Net.Http/src/Core/Request/HttpInputStream.cs b/lib/Net.Http/src/Core/Request/HttpInputStream.cs index dc903d2..fbb1d41 100644 --- a/lib/Net.Http/src/Core/Request/HttpInputStream.cs +++ b/lib/Net.Http/src/Core/Request/HttpInputStream.cs @@ -26,6 +26,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Runtime.CompilerServices; using VNLib.Utils; using VNLib.Utils.Memory; @@ -49,6 +50,8 @@ namespace VNLib.Net.Http.Core public HttpInputStream(IHttpContextInformation contextInfo) => ContextInfo = contextInfo; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void OnComplete() { //Dispose the initial data buffer if set diff --git a/lib/Net.Http/src/Core/Request/HttpRequest.cs b/lib/Net.Http/src/Core/Request/HttpRequest.cs index 0668197..2cc99bd 100644 --- a/lib/Net.Http/src/Core/Request/HttpRequest.cs +++ b/lib/Net.Http/src/Core/Request/HttpRequest.cs @@ -26,6 +26,7 @@ using System; using System.Net; using System.Collections.Generic; using System.Security.Authentication; +using System.Runtime.CompilerServices; using VNLib.Utils; using VNLib.Utils.Memory; @@ -97,6 +98,7 @@ namespace VNLib.Net.Http.Core void IHttpLifeCycle.OnNewConnection() { } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnNewRequest() { //Set to defaults @@ -106,6 +108,7 @@ namespace VNLib.Net.Http.Core HttpVersion = HttpVersion.None; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //release the input stream diff --git a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs index 4a88361..f87ec98 100644 --- a/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs +++ b/lib/Net.Http/src/Core/Response/ChunkDataAccumulator.cs @@ -24,7 +24,6 @@ using System; -using VNLib.Utils; using VNLib.Utils.IO; using VNLib.Net.Http.Core.Buffering; @@ -57,66 +56,45 @@ namespace VNLib.Net.Http.Core.Response */ private int _reservedOffset; - ///<inheritdoc/> public int RemainingSize => Buffer.Size - AccumulatedSize; ///<inheritdoc/> - public Span<byte> Remaining => Buffer.GetBinSpan()[AccumulatedSize..]; + Span<byte> IDataAccumulator<byte>.Remaining => Buffer.GetBinSpan()[AccumulatedSize..]; ///<inheritdoc/> - public Span<byte> Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize]; + Span<byte> IDataAccumulator<byte>.Accumulated => Buffer.GetBinSpan()[_reservedOffset.. AccumulatedSize]; ///<inheritdoc/> public int AccumulatedSize { get; set; } - /* - * Completed chunk is the segment of the buffer that contains the size segment - * followed by the accumulated chunk data, and the trailing crlf. - * - * AccumulatedSize points to the end of the accumulated chunk data. The reserved - * offset points to the start of the size segment. - */ - private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize]; + ///<inheritdoc/> + public void Advance(int count) => AccumulatedSize += count; /// <summary> - /// Attempts to buffer as much data as possible from the specified data + /// Gets the remaining segment of the buffer to write chunk data to. /// </summary> - /// <param name="data">The data to copy</param> - /// <returns>The number of bytes that were buffered</returns> - public ERRNO TryBufferChunk(ReadOnlySpan<byte> data) + /// <returns>The chunk buffer to write data to</returns> + public Memory<byte> GetRemainingSegment() { - //Calc data size and reserve space for final crlf - int dataToCopy = Math.Min(data.Length, RemainingSize - Context.EncodedSegments.CrlfBytes.Length); - - //Write as much data as possible - data[..dataToCopy].CopyTo(Remaining); - - //Advance buffer - Advance(dataToCopy); + /* + * We need to return the remaining segment of the buffer, the segment after the + * accumulated chunk data, but before the trailing crlf. + */ - //Return number of bytes not written - return dataToCopy; + //Get the remaining buffer segment + return Buffer.GetMemory().Slice(AccumulatedSize, RemainingSize - Context.EncodedSegments.CrlfBytes.Length); } - ///<inheritdoc/> - public void Advance(int count) => AccumulatedSize += count; - - private void InitReserved() + /// <summary> + /// Calculates the usable remaining size of the chunk buffer. + /// </summary> + /// <returns>The number of bytes remaining in the buffer</returns> + public int GetRemainingSegmentSize() { - //First reserve the chunk window by advancing the accumulator to the reserved size - Advance(ReservedSize); - } - - ///<inheritdoc/> - public void Reset() - { - //zero offsets - _reservedOffset = 0; - AccumulatedSize = 0; - //Init reserved segment - InitReserved(); + //Remaining size accounting for the trailing crlf + return RemainingSize - Context.EncodedSegments.CrlfBytes.Length; } /// <summary> @@ -156,6 +134,32 @@ namespace VNLib.Net.Http.Core.Response /* + * Completed chunk is the segment of the buffer that contains the size segment + * followed by the accumulated chunk data, and the trailing crlf. + * + * AccumulatedSize points to the end of the accumulated chunk data. The reserved + * offset points to the start of the size segment. + */ + private Memory<byte> GetCompleteChunk() => Buffer.GetMemory()[_reservedOffset..AccumulatedSize]; + + + private void InitReserved() + { + //First reserve the chunk window by advancing the accumulator to the reserved size + Advance(ReservedSize); + } + + ///<inheritdoc/> + public void Reset() + { + //zero offsets + _reservedOffset = 0; + AccumulatedSize = 0; + //Init reserved segment + InitReserved(); + } + + /* * UpdateChunkSize method updates the running total of the chunk size * in the reserved segment of the buffer. This is because http chunking * requires hex encoded chunk sizes to be written as the first bytes of diff --git a/lib/Net.Http/src/Core/Response/ChunkedStream.cs b/lib/Net.Http/src/Core/Response/ChunkedStream.cs index 639e18f..267d8ed 100644 --- a/lib/Net.Http/src/Core/Response/ChunkedStream.cs +++ b/lib/Net.Http/src/Core/Response/ChunkedStream.cs @@ -26,192 +26,70 @@ * Provides a Chunked data-encoding stream for writing data-chunks to * the transport using the basic chunked encoding format from MDN * https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#directives -* -* This stream will buffer entire chunks to avoid multiple writes to the -* transport which can block or at minium cause overhead in context switching -* which should be mostly avoided but cause overhead in copying. Time profiling -* showed nearly equivalent performance for small chunks for synchronous writes. -* */ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils; -using VNLib.Utils.Memory; using VNLib.Net.Http.Core.Buffering; namespace VNLib.Net.Http.Core.Response { -#pragma warning disable CA2215 // Dispose methods should call base class dispose -#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task - /// <summary> /// Writes chunked HTTP message bodies to an underlying streamwriter /// </summary> - internal sealed class ChunkedStream : ReusableResponseStream + internal sealed class ChunkedStream : ReusableResponseStream, IResponseDataWriter { private readonly ChunkDataAccumulator ChunckAccumulator; - - private bool HadError; internal ChunkedStream(IChunkAccumulatorBuffer buffer, IHttpContextInformation context) { //Init accumulator ChunckAccumulator = new(buffer, context); } - - public override void Write(ReadOnlySpan<byte> chunk) - { - //Only write non-zero chunks - if (chunk.Length <= 0) - { - return; - } - - //Init reader - ForwardOnlyReader<byte> reader = new(chunk); - try - { - do - { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window); - - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); - //Flush accumulator - Memory<byte> accChunk = ChunckAccumulator.GetChunkData(); - - //Reset the chunk accumulator - ChunckAccumulator.Reset(); - - //Write chunk data - transport!.Write(accChunk.Span); - - //Continue to buffer / flush as needed - continue; - } - break; - } - while (true); - } - catch - { - HadError = true; - throw; - } - } + #region Hooks - public override async ValueTask WriteAsync(ReadOnlyMemory<byte> chunk, CancellationToken cancellationToken = default) + ///<inheritdoc/> + public void OnNewRequest() { - //Only write non-zero chunks - if (chunk.Length <= 0) - { - return; - } - - try - { - //Init reader - ForwardOnlyMemoryReader<byte> reader = new(chunk); - - do - { - //try to accumulate the chunk data - ERRNO written = ChunckAccumulator.TryBufferChunk(reader.Window.Span); - - //Not all data was buffered - if (written < reader.WindowSize) - { - //Advance reader - reader.Advance(written); - - //Flush accumulator - Memory<byte> accChunk = ChunckAccumulator.GetChunkData(); - - //Reset the chunk accumulator - ChunckAccumulator.Reset(); - - //Flush accumulator async - await transport!.WriteAsync(accChunk, cancellationToken); - - //Continue to buffer / flush as needed - continue; - } - - break; - } - while (true); - } - catch - { - HadError = true; - throw; - } + ChunckAccumulator.OnNewRequest(); } - public override async ValueTask DisposeAsync() + ///<inheritdoc/> + public void OnComplete() { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } - - //Complete the last chunk - Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData(); - - //Reset the accumulator - ChunckAccumulator.Reset(); + ChunckAccumulator.OnComplete(); + } - //Write remaining data to stream - await transport!.WriteAsync(chunkData, CancellationToken.None); + ///<inheritdoc/> + public Memory<byte> GetMemory() => ChunckAccumulator.GetRemainingSegment(); - //Flush base stream - await transport!.FlushAsync(CancellationToken.None); + ///<inheritdoc/> + public int Advance(int written) + { + //Advance the accumulator + ChunckAccumulator.Advance(written); + return ChunckAccumulator.GetRemainingSegmentSize(); } - public override void Close() + ///<inheritdoc/> + public ValueTask FlushAsync(bool isFinal) { - //If write error occured, then do not write the last chunk - if (HadError) - { - return; - } - - //Complete the last chunk - Memory<byte> chunkData = ChunckAccumulator.GetFinalChunkData(); + /* + * We need to know when the final chunk is being flushed so we can + * write the final termination sequence to the transport. + */ + + Memory<byte> chunkData = isFinal ? ChunckAccumulator.GetFinalChunkData() : ChunckAccumulator.GetChunkData(); //Reset the accumulator ChunckAccumulator.Reset(); - //Write chunk data - transport!.Write(chunkData.Span); - - //Flush base stream - transport!.Flush(); - } - - #region Hooks - - public void OnNewRequest() - { - ChunckAccumulator.OnNewRequest(); - } - - public void OnComplete() - { - ChunckAccumulator.OnComplete(); - - //Clear error flag - HadError = false; + //Write remaining data to stream + return transport!.WriteAsync(chunkData, CancellationToken.None); } #endregion diff --git a/lib/Net.Http/src/Core/Response/DirectStream.cs b/lib/Net.Http/src/Core/Response/DirectStream.cs index 7d0e568..2406f0f 100644 --- a/lib/Net.Http/src/Core/Response/DirectStream.cs +++ b/lib/Net.Http/src/Core/Response/DirectStream.cs @@ -23,28 +23,17 @@ */ using System; -using System.Threading; using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Response { - internal sealed class DirectStream : ReusableResponseStream - { - - public override void Write(ReadOnlySpan<byte> buffer) - { - transport!.Write(buffer); - } - public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) - { - return transport!.WriteAsync(buffer, cancellationToken); - } - - //Pass through flush methods - public override void Flush() => transport!.Flush(); - - public override Task FlushAsync(CancellationToken cancellationToken) => transport!.FlushAsync(cancellationToken); + internal sealed class DirectStream : ReusableResponseStream, IDirectResponsWriter + { + ///<inheritdoc/> + public Task FlushAsync() => transport!.FlushAsync(); + ///<inheritdoc/> + public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer) => transport!.WriteAsync(buffer); } } diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs index 0b29e43..ca5f040 100644 --- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs +++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs @@ -23,11 +23,12 @@ */ using System; -using System.IO; using System.Net; using System.Diagnostics; using System.Threading.Tasks; +using VNLib.Net.Http.Core.Response; + namespace VNLib.Net.Http.Core { #pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task @@ -161,49 +162,36 @@ namespace VNLib.Net.Http.Core private async Task WriteEntityDataAsync(long length, CompressionMethod compMethod, bool hasExplicitLength) { - //Get output stream, and always dispose it - await using Stream outputStream = await GetOutputStreamAsync(length, compMethod); - //Determine if buffer is required Memory<byte> buffer = ResponseBody.BufferRequired ? Buffers.GetResponseDataBuffer() : Memory<byte>.Empty; - /* - * Using compression, we must initialize a compressor, and write the response - * with the locked compressor - */ - if (compMethod != CompressionMethod.None) + //We need to flush header before we can write to the transport + await Response.CompleteHeadersAsync(compMethod == CompressionMethod.None ? length : -1); + + if (compMethod == CompressionMethod.None) + { + //Setup a direct stream to write to + IDirectResponsWriter output = Response.GetDirectStream(); + + //Write response with optional forced length + await ResponseBody.WriteEntityAsync(output, hasExplicitLength ? length : -1, buffer); + } + else { //Compressor must never be null at this point Debug.Assert(_compressor != null, "Compression was allowed but the compressor was not initialized"); + //Get the chunked response writer + IResponseDataWriter output = Response.GetChunkWriter(); + //Init compressor (Deinint is deferred to the end of the request) - _compressor.Init(outputStream, compMethod); + _compressor.Init(compMethod); //Write response - await ResponseBody.WriteEntityAsync(_compressor, buffer); - - } - /* - * Explicit length may be set when the response may have more data than requested - * by the client. IE: when a range is set, we need to make sure we sent exactly the - * correct data, otherwise the client will drop the connection. - */ - else if(hasExplicitLength) - { - //Write response with explicit length - await ResponseBody.WriteEntityAsync(outputStream, length, buffer); - - } - else - { - await ResponseBody.WriteEntityAsync(outputStream, buffer); + await ResponseBody.WriteEntityAsync(_compressor, output, buffer); } } - private ValueTask<Stream> GetOutputStreamAsync(long length, CompressionMethod compMethod) - { - return compMethod == CompressionMethod.None ? Response.GetStreamAsync(length) : Response.GetStreamAsync(); - } #pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task diff --git a/lib/Net.Http/src/Core/Response/HttpResponse.cs b/lib/Net.Http/src/Core/Response/HttpResponse.cs index c09c3f7..90f6f24 100644 --- a/lib/Net.Http/src/Core/Response/HttpResponse.cs +++ b/lib/Net.Http/src/Core/Response/HttpResponse.cs @@ -190,59 +190,57 @@ namespace VNLib.Net.Http.Core } /// <summary> - /// Gets a stream for writing data of a specified length directly to the client + /// Flushes all available headers to the transport asynchronously /// </summary> - /// <param name="ContentLength"></param> - /// <returns>A <see cref="Stream"/> configured for writing data to client</returns> - /// <exception cref="OutOfMemoryException"></exception> - /// <exception cref="InvalidOperationException"></exception> - public ValueTask<Stream> GetStreamAsync(long ContentLength) + /// <param name="contentLength">The optional content length if set, <![CDATA[ < 0]]> for chunked responses</param> + /// <returns>A value task that completes when header data has been made available to the transport</returns> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ValueTask CompleteHeadersAsync(long contentLength) { Check(); - //Add content length header - Headers[HttpResponseHeader.ContentLength] = ContentLength.ToString(); + if (contentLength < 0) + { + //Add chunked header + Headers[HttpResponseHeader.TransferEncoding] = "chunked"; + + } + else + { + //Add content length header + Headers[HttpResponseHeader.ContentLength] = contentLength.ToString(); + } //Flush headers - ValueTask flush = EndFlushHeadersAsync(); + return EndFlushHeadersAsync(); + } - //Return the reusable stream - return flush.IsCompletedSuccessfully ? - ValueTask.FromResult<Stream>(ReusableDirectStream) - : GetStreamAsyncCore(flush, ReusableDirectStream); + /// <summary> + /// Gets a response writer for writing directly to the transport stream + /// </summary> + /// <returns>The <see cref="IDirectResponsWriter"/> instance for writing stream data to</returns> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IDirectResponsWriter GetDirectStream() + { + //Headers must be sent before getting a direct stream + Debug.Assert(HeadersSent); + return ReusableDirectStream; } /// <summary> - /// Sets up the client for chuncked encoding and gets a stream that allows for chuncks to be sent. User must call dispose on stream when done writing data + /// Gets a response writer for writing chunked data to the transport stream /// </summary> - /// <returns><see cref="Stream"/> supporting chunked encoding</returns> - /// <exception cref="OutOfMemoryException"></exception> - /// <exception cref="InvalidOperationException"></exception> - public ValueTask<Stream> GetStreamAsync() + /// <returns>The <see cref="IResponseDataWriter"/> for buffering response chunks</returns> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IResponseDataWriter GetChunkWriter() { //Chunking is only an http 1.1 feature (should never get called otherwise) - Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11); - - Check(); - - //Set encoding type to chunked with user-defined compression - Headers[HttpResponseHeader.TransferEncoding] = "chunked"; - - //Flush headers - ValueTask flush = EndFlushHeadersAsync(); + Debug.Assert(ContextInfo.CurrentVersion == HttpVersion.Http11, "Chunked response handler was requested, but is not an HTTP/1.1 response"); + Debug.Assert(HeadersSent, "Chunk write was requested but header data has not been sent"); - //Return the reusable stream - return flush.IsCompletedSuccessfully ? - ValueTask.FromResult<Stream>(ReusableChunkedStream) - : GetStreamAsyncCore(flush, ReusableChunkedStream); - } - - private static async ValueTask<Stream> GetStreamAsyncCore(ValueTask flush, Stream stream) - { - //Await the flush and get the stream - await flush.ConfigureAwait(false); - return stream; + return ReusableChunkedStream; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] void Check() @@ -329,6 +327,7 @@ namespace VNLib.Net.Http.Core } ///<inheritdoc/> + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnNewRequest() { //Default to okay status code @@ -338,6 +337,7 @@ namespace VNLib.Net.Http.Core } ///<inheritdoc/> + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //Clear headers and cookies diff --git a/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs new file mode 100644 index 0000000..7c9ca41 --- /dev/null +++ b/lib/Net.Http/src/Core/Response/IDirectResponsWriter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IDirectResponsWriter.cs +* +* IDirectResponsWriter.cs is part of VNLib.Net.Http which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Response +{ + /// <summary> + /// Represents a stream that can be written to directly (does not + /// buffer response data) + /// </summary> + internal interface IDirectResponsWriter + { + /// <summary> + /// Writes the given data buffer to the client + /// </summary> + /// <param name="buffer">The response data to write</param> + /// <returns>A value task that resolves when the write operation is complete</returns> + ValueTask WriteAsync(ReadOnlyMemory<byte> buffer); + + /// <summary> + /// Flushes any remaining data to the client + /// </summary> + /// <returns>A task that resolves when the flush operationis complete</returns> + Task FlushAsync(); + } +} diff --git a/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs new file mode 100644 index 0000000..55ef49c --- /dev/null +++ b/lib/Net.Http/src/Core/Response/IResponseDataWriter.cs @@ -0,0 +1,56 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Http +* File: IResponseDataWriter.cs +* +* IResponseDataWriter.cs is part of VNLib.Net.Http which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Http is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Http is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading.Tasks; + +namespace VNLib.Net.Http.Core.Response +{ + /// <summary> + /// A buffered http response data writer + /// </summary> + internal interface IResponseDataWriter + { + /// <summary> + /// Gets the next memory segment available to buffer data to + /// </summary> + /// <returns>An available buffer to write response data to </returns> + Memory<byte> GetMemory(); + + /// <summary> + /// Advances the writer by the number of bytes written and returns the + /// number of bytes available for writing on the next call to <see cref="GetMemory"/> + /// </summary> + /// <param name="written">The number of bytes written to the output buffer</param> + /// <returns>The number of bytes remaining in the internal buffer</returns> + int Advance(int written); + + /// <summary> + /// Flushes the internal buffer to the underlying stream + /// </summary> + /// <param name="isFinal">A value that indicates that this is final call to flush</param> + /// <returns>A valuetask that completes </returns> + ValueTask FlushAsync(bool isFinal); + } +}
\ No newline at end of file diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs index 77dc619..d67fc01 100644 --- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs +++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs @@ -22,12 +22,20 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +/* + * This file handles response entity processing. It handles in-memory response + * processing, as well as stream response processing. It handles contraints + * such as content-range limits. I tried to eliminate or reduce the amount of + * memory copying required to process the response entity. + */ + using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Runtime.CompilerServices; -using VNLib.Utils.Extensions; +using VNLib.Net.Http.Core.Response; using VNLib.Net.Http.Core.Compression; namespace VNLib.Net.Http.Core @@ -94,68 +102,95 @@ namespace VNLib.Net.Http.Core ReadOnlyMemory<byte> _readSegment; ///<inheritdoc/> - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, long count, Memory<byte> buffer) + async Task IHttpResponseBody.WriteEntityAsync(IDirectResponsWriter dest, long count, Memory<byte> buffer) { int remaining; //Write a sliding window response if (_memoryResponse != null) { - //Get min value from count/range length - remaining = (int)Math.Min(count, _memoryResponse.Remaining); - - //Write response body from memory - while (remaining > 0) + if(count > 0) { - //Get remaining segment - _readSegment = _memoryResponse.GetRemainingConstrained(remaining); - - //Write segment to output stream - await dest.WriteAsync(_readSegment); - - //Advance by the written ammount - _memoryResponse.Advance(_readSegment.Length); - - //Update remaining - remaining -= _readSegment.Length; - } - } - else - { - //Buffer is required, and count must be supplied - await _streamResponse!.CopyToAsync(dest, buffer, count); + //Get min value from count/range length + remaining = (int)Math.Min(count, _memoryResponse.Remaining); - //Try to dispose the response stream asyncrhonously since we are done with it - await _streamResponse!.DisposeAsync(); + //Write response body from memory + while (remaining > 0) + { + //Get remaining segment + _readSegment = _memoryResponse.GetRemainingConstrained(remaining); - //remove ref so its not disposed again - _streamResponse = null; - } - } + //Write segment to output stream + await dest.WriteAsync(_readSegment); - ///<inheritdoc/> - async Task IHttpResponseBody.WriteEntityAsync(Stream dest, Memory<byte> buffer) - { - //Write a sliding window response - if (_memoryResponse != null) - { - //Write response body from memory - while (_memoryResponse.Remaining > 0) + //Advance by the written ammount + _memoryResponse.Advance(_readSegment.Length); + + //Update remaining + remaining -= _readSegment.Length; + } + } + else { - //Get remaining segment - _readSegment = _memoryResponse.GetMemory(); + //Write response body from memory + while (_memoryResponse.Remaining > 0) + { + //Get remaining segment + _readSegment = _memoryResponse.GetMemory(); - //Write segment to output stream - await dest.WriteAsync(_readSegment); + //Write segment to output stream + await dest.WriteAsync(_readSegment); - //Advance by the written ammount - _memoryResponse.Advance(_readSegment.Length); + //Advance by the written amount + _memoryResponse.Advance(_readSegment.Length); + } } + + //Disposing of memory response can be deferred until the end of the request since its always syncrhonous } else { - //Buffer is required, and count must be supplied - await _streamResponse!.CopyToAsync(dest, buffer); + if (count > 0) + { + //Buffer is required, and count must be supplied + + long total = 0; + int read; + while (true) + { + //get offset wrapper of the total buffer or remaining count + Memory<byte> offset = buffer[..(int)Math.Min(buffer.Length, count - total)]; + //read + read = await _streamResponse!.ReadAsync(offset); + //Guard + if (read == 0) + { + break; + } + //write only the data that was read (slice) + await dest.WriteAsync(offset[..read]); + //Update total + total += read; + } + } + else + { + //Read in loop + do + { + //read + int read = await _streamResponse!.ReadAsync(buffer); + //Guard + if (read == 0) + { + break; + } + + //write only the data that was read (slice) + await dest.WriteAsync(buffer[..read]); + + } while (true); + } //Try to dispose the response stream asyncrhonously since we are done with it await _streamResponse!.DisposeAsync(); @@ -166,64 +201,22 @@ namespace VNLib.Net.Http.Core } ///<inheritdoc/> - async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor dest, Memory<byte> buffer) + async Task IHttpResponseBody.WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer) { //Locals - bool remaining; int read; //Write a sliding window response if (_memoryResponse != null) - { - /* - * It is safe to assume if a response body was set, that it contains data. - * So the cost or running a loop without data is not a concern. - * - * Since any failed writes to the output will raise exceptions, it is safe - * to advance the reader before writing the data, so we can determine if the - * block is final. - * - * Since we are using a byte-stream reader for memory responses, we can optimize the - * compression loop, if we know its operating block size, so we only compress blocks - * of the block size, then continue the loop without branching or causing nested - * loops - */ - - //Optimize for block size - if (dest.BlockSize > 0) - { - //Write response body from memory - do - { - _readSegment = _memoryResponse.GetRemainingConstrained(dest.BlockSize); - - //Advance by the trimmed segment length - _memoryResponse.Advance(_readSegment.Length); - - //Check if data is remaining after an advance - remaining = _memoryResponse.Remaining > 0; - - //Compress the trimmed block - await dest.CompressBlockAsync(_readSegment, !remaining); - - } while (remaining); - } - else + { + while (_memoryResponse.Remaining > 0) { - do + //Commit output bytes + if (CompressNextSegment(_memoryResponse, comp, writer)) { - _readSegment = _memoryResponse.GetMemory(); - - //Advance by the segment length, this should be safe even if its zero - _memoryResponse.Advance(_readSegment.Length); - - //Check if data is remaining after an advance - remaining = _memoryResponse.Remaining > 0; - - //Write to output - await dest.CompressBlockAsync(_readSegment, !remaining); - - } while (remaining); + //Time to flush + await writer.FlushAsync(false); + } } //Disposing of memory response can be deferred until the end of the request since its always syncrhonous @@ -231,9 +224,9 @@ namespace VNLib.Net.Http.Core else { //Trim buffer to block size if it is set by the compressor - if (dest.BlockSize > 0) + if (comp.BlockSize > 0) { - buffer = buffer[..dest.BlockSize]; + buffer = buffer[..comp.BlockSize]; } //Read in loop @@ -248,8 +241,12 @@ namespace VNLib.Net.Http.Core break; } - //write only the data that was read, as a segment instead of a block - await dest.CompressBlockAsync(buffer[..read], read < buffer.Length); + //Compress the buffered data and flush if required + if(CompressNextSegment(buffer, comp, writer)) + { + //Time to flush + await writer.FlushAsync(false); + } } while (true); @@ -263,17 +260,69 @@ namespace VNLib.Net.Http.Core _streamResponse = null; } - //Continue flusing flushing the compressor if required - while(dest.IsFlushRequired()) + + /* + * Once there is no more response data avialable to compress + * we need to flush the compressor, then flush the writer + * to publish all accumulated data to the client + */ + + do { - //Flush the compressor - await dest.CompressBlockAsync(ReadOnlyMemory<byte>.Empty, true); - } + //Get output buffer + Memory<byte> output = writer.GetMemory(); + + //Flush the compressor output + int written = comp.Flush(output); + + //No more data to buffer + if (written == 0) + { + //final flush and exit + await writer.FlushAsync(true); + break; + } + + if (writer.Advance(written) == 0) + { + //Flush because accumulator is full + await writer.FlushAsync(false); + } + + } while (true); + } + + private static bool CompressNextSegment(IMemoryResponseReader reader, IResponseCompressor comp, IResponseDataWriter writer) + { + //Read the next segment + ReadOnlyMemory<byte> readSegment = comp.BlockSize > 0 ? reader.GetRemainingConstrained(comp.BlockSize) : reader.GetMemory(); + + //Get output buffer + Memory<byte> output = writer.GetMemory(); + + //Compress the trimmed block + CompressionResult res = comp.CompressBlock(readSegment, output); + + //Commit input bytes + reader.Advance(res.BytesRead); + + return writer.Advance(res.BytesWritten) == 0; + } + + private static bool CompressNextSegment(Memory<byte> readSegment, IResponseCompressor comp, IResponseDataWriter writer) + { + //Get output buffer + Memory<byte> output = writer.GetMemory(); + + //Compress the trimmed block + CompressionResult res = comp.CompressBlock(readSegment, output); + + return writer.Advance(res.BytesWritten) == 0; } #pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task - + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnComplete() { //Clear has data flag diff --git a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs index 0be4821..60465f9 100644 --- a/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs +++ b/lib/Net.Http/src/Core/Response/ReusableResponseStream.cs @@ -22,18 +22,12 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; using System.IO; -using System.Threading; -using System.Threading.Tasks; namespace VNLib.Net.Http.Core.Response { -#pragma warning disable CA2215 // Dispose methods should call base class dispose -#pragma warning disable CA1844 // Provide memory-based overrides of async methods when subclassing 'Stream' - - internal abstract class ReusableResponseStream : Stream + internal abstract class ReusableResponseStream { protected Stream? transport; @@ -46,50 +40,7 @@ namespace VNLib.Net.Http.Core.Response /// <summary> /// Called when the connection is released /// </summary> - public virtual void OnRelease() => this.transport = null; - - - //Block base dispose - protected override void Dispose(bool disposing) - { } - - //Block base close - public override void Close() - { } - - //block base dispose async - public override ValueTask DisposeAsync() - { - return ValueTask.CompletedTask; - } - - //Block flush - public override void Flush() - { } - - //Block flush async - public override Task FlushAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - //Block stream basics - public override bool CanRead => false; - public override bool CanSeek => false; - public override bool CanWrite => true; - public override long Length => throw new NotSupportedException(); - public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } - - //Reading is not enabled - public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream cannot be read from"); - public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream does not support seeking"); - public override void SetLength(long value) => throw new NotSupportedException("This stream does not support seeking"); - - public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count)); - - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); - } + public virtual void OnRelease() => transport = null; + } }
\ No newline at end of file |