From be6dc557a3b819248b014992eb96c1cb21f8112b Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 8 Jan 2023 14:44:01 -0500 Subject: Initial commit --- Utils/src/IO/VnStreamWriter.cs | 292 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 Utils/src/IO/VnStreamWriter.cs (limited to 'Utils/src/IO/VnStreamWriter.cs') diff --git a/Utils/src/IO/VnStreamWriter.cs b/Utils/src/IO/VnStreamWriter.cs new file mode 100644 index 0000000..37d700c --- /dev/null +++ b/Utils/src/IO/VnStreamWriter.cs @@ -0,0 +1,292 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Utils +* File: VnStreamWriter.cs +* +* VnStreamWriter.cs is part of VNLib.Utils which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Utils is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published +* by the Free Software Foundation, either version 2 of the License, +* or (at your option) any later version. +* +* VNLib.Utils is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with VNLib.Utils. If not, see http://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Text; +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.InteropServices; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Memory; + +namespace VNLib.Utils.IO +{ + /// + /// Provides a memory optimized implementation. Optimized for writing + /// to network streams + /// + public class VnStreamWriter : TextWriter + { + private readonly Encoder Enc; + + private readonly ISlindingWindowBuffer _buffer; + + private bool closed; + + /// + /// Gets the underlying stream that interfaces with the backing store + /// + public virtual Stream BaseStream { get; } + /// + public override Encoding Encoding { get; } + + /// + /// Line termination to use when writing lines to the output + /// + public ReadOnlyMemory LineTermination { get; set; } + /// + public override string NewLine + { + get => Encoding.GetString(LineTermination.Span); + set => LineTermination = Encoding.GetBytes(value); + } + + /// + /// Creates a new that writes formatted data + /// to the specified base stream + /// + /// The stream to write data to + /// The to use when writing data + /// The size of the internal buffer used to buffer binary data before writing to the base stream + public VnStreamWriter(Stream baseStream, Encoding encoding, int bufferSize = 1024) + { + //Store base stream + BaseStream = baseStream; + Encoding = encoding; + //Get an encoder + Enc = encoding.GetEncoder(); + _buffer = InitializeBuffer(bufferSize); + } + + /// + /// Invoked by the constuctor method to allocte the internal buffer with the specified buffer size. + /// + /// The requested size of the buffer to alloc + /// By default requests the buffer from the instance + protected virtual ISlindingWindowBuffer InitializeBuffer(int bufferSize) => new ArrayPoolStreamBuffer(ArrayPool.Shared, bufferSize); + /// + public void Write(byte value) + { + //See if there is room in the binary buffer + if (_buffer.AccumulatedSize == 0) + { + //There is not enough room to store the single byte + Flush(); + } + //Store at the end of the window + _buffer.Append(value); + } + /// + public override void Write(char value) + { + ReadOnlySpan tbuf = MemoryMarshal.CreateSpan(ref value, 0x01); + Write(tbuf); + } + /// + public override void Write(object value) => Write(value.ToString()); + /// + public override void Write(string value) => Write(value.AsSpan()); + /// + public override void Write(ReadOnlySpan buffer) + { + Check(); + + ForwardOnlyReader reader = new(buffer); + + //Create a variable for a character buffer window + bool completed; + do + { + //Get an available buffer window to store characters in and convert the characters to binary + Enc.Convert(reader.Window, _buffer.Remaining, true, out int charsUsed, out int bytesUsed, out completed); + //Update byte position + _buffer.Advance(bytesUsed); + //Update char position + reader.Advance(charsUsed); + + //Converting did not complete because the buffer was too small + if (!completed || reader.WindowSize == 0) + { + //Flush the buffer and continue + Flush(); + } + + } while (!completed); + //Reset the encoder + Enc.Reset(); + } + /// + public override async Task WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + Check(); + //Create a variable for a character buffer window + bool completed; + ForwardOnlyMemoryReader reader = new(buffer); + do + { + //Get an available buffer window to store characters in and convert the characters to binary + Enc.Convert(reader.Window.Span, _buffer.Remaining, true, out int charsUsed, out int bytesUsed, out completed); + //Update byte position + _buffer.Advance(bytesUsed); + //Update char position + reader.Advance(charsUsed); + //Converting did not complete because the buffer was too small + if (!completed || reader.WindowSize == 0) + { + //Flush the buffer and continue + await FlushWriterAsync(cancellationToken); + } + } while (!completed); + //Reset the encoder + Enc.Reset(); + } + + /// + public override void WriteLine() + { + Check(); + //See if there is room in the binary buffer + if (_buffer.RemainingSize < LineTermination.Length) + { + //There is not enough room to store the termination, so we need to flush the buffer + Flush(); + } + _buffer.Append(LineTermination.Span); + } + /// + public override void WriteLine(object value) => WriteLine(value.ToString()); + /// + public override void WriteLine(string value) => WriteLine(value.AsSpan()); + /// + public override void WriteLine(ReadOnlySpan buffer) + { + //Write the value itself + Write(buffer); + //Write the line termination + WriteLine(); + } + + /// + /// + public override void Flush() + { + Check(); + //If data is available to be written, write it to the base stream + if (_buffer.AccumulatedSize > 0) + { + //Write all buffered data to stream + BaseStream.Write(_buffer.Accumulated); + //Reset the buffer + _buffer.Reset(); + } + } + /// + /// Asynchronously flushes the internal buffers to the , and resets the internal buffer state + /// + /// A that represents the asynchronous flush operation + /// + public async ValueTask FlushWriterAsync(CancellationToken cancellationToken = default) + { + Check(); + if (_buffer.AccumulatedSize > 0) + { + //Flush current window to the stream + await BaseStream.WriteAsync(_buffer.AccumulatedBuffer, cancellationToken); + //Reset the buffer + _buffer.Reset(); + } + } + + /// + public override Task FlushAsync() => FlushWriterAsync().AsTask(); + + /// + /// Resets internal properies for resuse + /// + protected void Reset() + { + _buffer.Reset(); + Enc.Reset(); + } + /// + public override void Close() + { + //Only invoke close once + if (closed) + { + return; + } + try + { + Flush(); + } + finally + { + //Release the memory handle if its set + _buffer.Close(); + //Set closed flag + closed = true; + } + } + /// + protected override void Dispose(bool disposing) + { + Close(); + base.Dispose(disposing); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void Check() + { + if (closed) + { + throw new ObjectDisposedException("The stream is closed"); + } + } + /// + public override async ValueTask DisposeAsync() + { + //Only invoke close once + if (closed) + { + return; + } + try + { + await FlushWriterAsync(); + } + finally + { + //Set closed flag + closed = true; + //Release the memory handle if its set + _buffer.Close(); + } + GC.SuppressFinalize(this); + } + } +} \ No newline at end of file -- cgit